This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 44791f361 Handle container cleanup from ActivationClient shutdown
gracefully (#5348)
44791f361 is described below
commit 44791f361d1492e985e9f1bcf3616253c77ed39d
Author: Dominic Kim <[email protected]>
AuthorDate: Fri Nov 4 13:30:58 2022 +0900
Handle container cleanup from ActivationClient shutdown gracefully (#5348)
* Fix the regression
* Apply scalaFmt
* Fix test cases
* Make the MemoryQueueTests stable
* Make the ActivationClientProxyTests stable
---
.../containerpool/v2/ActivationClientProxy.scala | 24 +++++---
.../v2/FunctionPullingContainerProxy.scala | 43 ++++++---------
.../containerpool/v2/InvokerHealthManager.scala | 2 +-
.../scheduler/queue/SchedulingDecisionMaker.scala | 4 +-
.../v2/test/ActivationClientProxyTests.scala | 24 +++++++-
.../test/FunctionPullingContainerProxyTests.scala | 64 +++++++++++-----------
.../scheduler/queue/test/MemoryQueueTests.scala | 2 +-
7 files changed, 91 insertions(+), 72 deletions(-)
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
index dbfb7fa26..dc38e10b8 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
@@ -18,7 +18,7 @@
package org.apache.openwhisk.core.containerpool.v2
import akka.actor.Status.{Failure => FailureMessage}
-import akka.actor.{ActorRef, ActorSystem, FSM, Props, Stash}
+import akka.actor.{ActorSystem, FSM, Props, Stash}
import akka.grpc.internal.ClientClosedException
import akka.pattern.pipe
import io.grpc.StatusRuntimeException
@@ -36,7 +36,7 @@ import scala.concurrent.Future
import scala.util.{Success, Try}
// Event send by the actor
-case class ClientCreationCompleted(client: Option[ActorRef] = None)
+case object ClientCreationCompleted
case object ClientClosed
// Event received by the actor
@@ -91,12 +91,14 @@ class ActivationClientProxy(
stay using r
case Event(client: ActivationClient, _) =>
- context.parent ! ClientCreationCompleted()
+ context.parent ! ClientCreationCompleted
goto(ClientProxyReady) using Client(client.client, client.rpcHost,
client.rpcPort)
case Event(f: FailureMessage, _) =>
logging.error(this, s"failed to create grpc client for ${action} caused
by: $f")
+ context.parent ! f
+
self ! ClientClosed
goto(ClientProxyRemoving)
@@ -164,9 +166,12 @@ class ActivationClientProxy(
stay()
case _: ActionMismatch =>
- logging.error(this, s"[${containerId.asString}] action version does
not match: $action")
+ val errorMsg = s"[${containerId.asString}] action version does not
match: $action"
+ logging.error(this, errorMsg)
c.activationClient.close().andThen {
- case _ => self ! ClientClosed
+ case _ =>
+ context.parent ! FailureMessage(new RuntimeException(errorMsg))
+ self ! ClientClosed
}
goto(ClientProxyRemoving)
@@ -194,6 +199,7 @@ class ActivationClientProxy(
// it would print huge log due to create another grpcClient to fetch
activation again.
case t: StatusRuntimeException if
t.getMessage.contains(ActivationClientProxy.hostResolveError) =>
logging.error(this, s"[${containerId.asString}] akka grpc server
connection failed: $t")
+ context.parent ! FailureMessage(t)
self ! ClientClosed
goto(ClientProxyRemoving)
@@ -208,14 +214,18 @@ class ActivationClientProxy(
stay()
- case _: ClientClosedException =>
+ case t: ClientClosedException =>
logging.error(this, s"[${containerId.asString}] grpc client is
already closed for $action")
+ context.parent ! FailureMessage(t)
+
self ! ClientClosed
goto(ClientProxyRemoving)
case t: Throwable =>
logging.error(this, s"[${containerId.asString}] get activation from
remote server error: $t")
+ context.parent ! FailureMessage(t)
+
safelyCloseClient(c)
goto(ClientProxyRemoving)
}
@@ -372,7 +382,7 @@ class ActivationClientProxy(
logging.debug(this, s"grpc client is closed for $fqn in the Try
closure")
Future.successful(ClientClosed)
}
- .getOrElse(Future.failed(new Exception(s"error to get $fqn activation
from grpc server")))
+ .getOrElse(Future.failed(new RuntimeException(s"error to get $fqn
activation from grpc server")))
}
private def createActivationClient(invocationNamespace: String,
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
index 99423581f..988416fc1 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
@@ -272,8 +272,7 @@ class FunctionPullingContainerProxy(
job.rpcPort,
container.containerId)) match {
case Success(clientProxy) =>
- clientProxy ! StartClient
- ContainerCreatedData(container, job.invocationNamespace,
job.action)
+ InitializedData(container, job.invocationNamespace, job.action,
clientProxy)
case Failure(t) =>
logging.error(this, s"failed to create activation client caused
by: $t")
ClientCreationFailed(t, container, job.invocationNamespace,
job.action)
@@ -303,7 +302,7 @@ class FunctionPullingContainerProxy(
// prewarmed state, container created
when(ContainerCreated) {
case Event(job: Initialize, data: PreWarmData) =>
- Try(
+ val res = Try(
clientProxyFactory(
context,
job.invocationNamespace,
@@ -313,13 +312,15 @@ class FunctionPullingContainerProxy(
job.rpcPort,
data.container.containerId)) match {
case Success(proxy) =>
- proxy ! StartClient
+ InitializedData(data.container, job.invocationNamespace, job.action,
proxy)
case Failure(t) =>
logging.error(this, s"failed to create activation client for
${job.action} caused by: $t")
- self ! ClientCreationFailed(t, data.container,
job.invocationNamespace, job.action)
+ ClientCreationFailed(t, data.container, job.invocationNamespace,
job.action)
}
- goto(CreatingClient) using ContainerCreatedData(data.container,
job.invocationNamespace, job.action)
+ self ! res
+
+ goto(CreatingClient)
case Event(Remove, data: PreWarmData) =>
cleanUp(data.container, None, false)
@@ -334,27 +335,19 @@ class FunctionPullingContainerProxy(
when(CreatingClient) {
// wait for client creation when cold start
- case Event(job: ContainerCreatedData, _: NonexistentData) =>
- stay() using job
+ case Event(job: InitializedData, _) =>
+ job.clientProxy ! StartClient
- // wait for container creation when cold start
- case Event(ClientCreationCompleted(proxy), _: NonexistentData) =>
- akka.pattern.after(3.milliseconds, actorSystem.scheduler) {
- self ! ClientCreationCompleted(proxy.orElse(Some(sender())))
- Future.successful({})
- }
-
- stay()
+ stay() using job
// client was successfully obtained
- case Event(ClientCreationCompleted(proxy), data: ContainerCreatedData) =>
- val clientProxy = proxy.getOrElse(sender())
+ case Event(ClientCreationCompleted, data: InitializedData) =>
val fqn = data.action.fullyQualifiedName(true)
val revision = data.action.rev
dataManagementService ! RegisterData(
s"${ContainerKeys.existingContainers(data.invocationNamespace, fqn,
revision, Some(instance), Some(data.container.containerId))}",
"")
- self ! InitializedData(data.container, data.invocationNamespace,
data.action, clientProxy)
+ self ! data
goto(ClientCreated)
// client creation failed
@@ -362,13 +355,7 @@ class FunctionPullingContainerProxy(
invokerHealthManager ! HealthMessage(state = false)
cleanUp(t.container, t.invocationNamespace,
t.action.fullyQualifiedName(withVersion = true), t.action.rev, None)
- // there can be a case that client create is failed and a ClientClosed
will be sent by ActivationClientProxy
- // wait for container creation when cold start
- case Event(ClientClosed, _: NonexistentData) =>
- self ! ClientClosed
- stay()
-
- case Event(ClientClosed, data: ContainerCreatedData) =>
+ case Event(ClientClosed, data: InitializedData) =>
invokerHealthManager ! HealthMessage(state = false)
cleanUp(
data.container,
@@ -378,7 +365,7 @@ class FunctionPullingContainerProxy(
None)
// container creation failed when cold start
- case Event(t: FailureMessage, _) =>
+ case Event(_: FailureMessage, _) =>
context.parent ! ContainerRemoved(true)
stop()
@@ -518,6 +505,8 @@ class FunctionPullingContainerProxy(
data.action.fullyQualifiedName(withVersion = true),
data.action.rev,
Some(data.clientProxy))
+
+ case x: Event if x.event != PingCache => delay
}
when(Running) {
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
index b0a4ad80e..3554e515d 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
@@ -247,7 +247,7 @@ case class HealthActivationServiceClient() extends Actor {
private var closed: Boolean = false
override def receive: Receive = {
- case StartClient => sender() ! ClientCreationCompleted()
+ case StartClient => sender() ! ClientCreationCompleted
case _: RequestActivation =>
InvokerHealthManager.healthActivation match {
case Some(activation) if !closed =>
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
index 19c458fc9..d5dca8bb7 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
@@ -96,7 +96,9 @@ class SchedulingDecisionMaker(
this,
s"there is no capacity activations will be dropped or throttled,
(availableMsg: $availableMsg totalContainers: $totalContainers, limit: $limit,
namespaceContainers: ${existingContainerCountInNs},
namespaceInProgressContainer: ${inProgressContainerCountInNs})
[$invocationNamespace:$action]")
Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg =
totalContainers == 0), 0))
- case NamespaceThrottled if
schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(limit *
schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) -
existingContainerCountInNs - inProgressContainerCountInNs > 0 =>
+ case NamespaceThrottled
+ if schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(
+ limit *
schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) -
existingContainerCountInNs - inProgressContainerCountInNs > 0 =>
Future.successful(DecisionResults(DisableNamespaceThrottling, 0))
// do nothing
case _ =>
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
index 9d33a508b..cf8ef15b2 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.containerpool.v2.test
import akka.Done
import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
+import akka.actor.Status.Failure
import akka.actor.{ActorRef, ActorSystem}
import akka.grpc.internal.ClientClosedException
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
@@ -103,7 +104,7 @@ class ActivationClientProxyTests
machine ! StartClient
- probe.expectMsg(ClientCreationCompleted())
+ probe.expectMsg(ClientCreationCompleted)
probe.expectMsg(Transition(machine, ClientProxyUninitialized,
ClientProxyReady))
}
@@ -124,6 +125,9 @@ class ActivationClientProxyTests
machine ! StartClient
+ probe.expectMsgPF() {
+ case Failure(t) => t.getMessage shouldBe "The number of client creation
retries has been exceeded."
+ }
probe.expectMsg(Transition(machine, ClientProxyUninitialized,
ClientProxyRemoving))
probe.expectMsg(ClientClosed)
@@ -208,7 +212,14 @@ class ActivationClientProxyTests
ready(machine, probe)
machine ! RequestActivation()
- probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
+
+ inAnyOrder {
+ probe.expectMsg(Transition(machine, ClientProxyReady,
ClientProxyRemoving))
+ probe.expectMsgPF() {
+ case Failure(t) => t.getMessage.contains(s"action version does not
match") shouldBe true
+ }
+ }
+
probe.expectMsg(ClientClosed)
probe expectTerminated machine
@@ -319,7 +330,11 @@ class ActivationClientProxyTests
ready(machine, probe)
machine ! RequestActivation()
+ probe.expectMsgPF() {
+ case Failure(t) => t.isInstanceOf[ClientClosedException] shouldBe true
+ }
probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
+
probe.expectMsg(ClientClosed)
probe expectTerminated machine
@@ -343,6 +358,9 @@ class ActivationClientProxyTests
ready(machine, probe)
machine ! RequestActivation()
+ probe.expectMsgPF() {
+ case Failure(t) => t.getMessage.contains("Unknown exception") shouldBe
true
+ }
probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
probe.expectMsg(ClientClosed)
@@ -426,7 +444,7 @@ class ActivationClientProxyTests
def ready(machine: ActorRef, probe: TestProbe) = {
machine ! StartClient
- probe.expectMsg(ClientCreationCompleted())
+ probe.expectMsg(ClientCreationCompleted)
probe.expectMsg(Transition(machine, ClientProxyUninitialized,
ClientProxyReady))
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
index 3923a9ed3..a3dbae818 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
@@ -444,7 +444,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, transid)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -509,7 +509,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -573,7 +573,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -642,7 +642,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -883,7 +883,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -943,7 +943,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -1015,7 +1015,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -1094,7 +1094,7 @@ class FunctionPullingContainerProxyTests
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -1180,7 +1180,7 @@ class FunctionPullingContainerProxyTests
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -1265,7 +1265,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -1342,7 +1342,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -1423,7 +1423,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -1500,7 +1500,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -1576,7 +1576,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
awaitAssert {
machine.underlyingActor.stateData.getContainer should not be None
@@ -1670,7 +1670,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
dataManagementService.expectMsg(
RegisterData(
ContainerKeys.existingContainers(
@@ -1779,7 +1779,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, noLogsAction,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -1910,7 +1910,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -1984,7 +1984,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -2061,7 +2061,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -2140,7 +2140,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -2230,7 +2230,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -2293,7 +2293,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -2359,7 +2359,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -2429,7 +2429,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -2512,7 +2512,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -2589,7 +2589,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -2656,7 +2656,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -2742,7 +2742,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -2822,7 +2822,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, transid)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -2893,7 +2893,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -2955,7 +2955,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
@@ -3021,7 +3021,7 @@ class FunctionPullingContainerProxyTests
machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
client.expectMsg(StartClient)
- client.send(machine, ClientCreationCompleted(Some(client.ref)))
+ client.send(machine, ClientCreationCompleted)
probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index d7ec5afd1..37191378c 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -1470,7 +1470,7 @@ class MemoryQueueTests
fsm ! QueueRemovedCompleted
parent.expectMsg(10 seconds, Transition(fsm, Removing, Removed))
- probe.expectTerminated(fsm)
+ probe.expectTerminated(fsm, 10 seconds)
}
it should "throttle the namespace when the limit is already reached" in {