This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 44791f361 Handle container cleanup from ActivationClient shutdown 
gracefully (#5348)
44791f361 is described below

commit 44791f361d1492e985e9f1bcf3616253c77ed39d
Author: Dominic Kim <[email protected]>
AuthorDate: Fri Nov 4 13:30:58 2022 +0900

    Handle container cleanup from ActivationClient shutdown gracefully (#5348)
    
    * Fix the regression
    
    * Apply scalaFmt
    
    * Fix test cases
    
    * Make the MemoryQueueTests stable
    
    * Make the ActivationClientProxyTests stable
---
 .../containerpool/v2/ActivationClientProxy.scala   | 24 +++++---
 .../v2/FunctionPullingContainerProxy.scala         | 43 ++++++---------
 .../containerpool/v2/InvokerHealthManager.scala    |  2 +-
 .../scheduler/queue/SchedulingDecisionMaker.scala  |  4 +-
 .../v2/test/ActivationClientProxyTests.scala       | 24 +++++++-
 .../test/FunctionPullingContainerProxyTests.scala  | 64 +++++++++++-----------
 .../scheduler/queue/test/MemoryQueueTests.scala    |  2 +-
 7 files changed, 91 insertions(+), 72 deletions(-)

diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
index dbfb7fa26..dc38e10b8 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
@@ -18,7 +18,7 @@
 package org.apache.openwhisk.core.containerpool.v2
 
 import akka.actor.Status.{Failure => FailureMessage}
-import akka.actor.{ActorRef, ActorSystem, FSM, Props, Stash}
+import akka.actor.{ActorSystem, FSM, Props, Stash}
 import akka.grpc.internal.ClientClosedException
 import akka.pattern.pipe
 import io.grpc.StatusRuntimeException
@@ -36,7 +36,7 @@ import scala.concurrent.Future
 import scala.util.{Success, Try}
 
 // Event send by the actor
-case class ClientCreationCompleted(client: Option[ActorRef] = None)
+case object ClientCreationCompleted
 case object ClientClosed
 
 // Event received by the actor
@@ -91,12 +91,14 @@ class ActivationClientProxy(
       stay using r
 
     case Event(client: ActivationClient, _) =>
-      context.parent ! ClientCreationCompleted()
+      context.parent ! ClientCreationCompleted
 
       goto(ClientProxyReady) using Client(client.client, client.rpcHost, 
client.rpcPort)
 
     case Event(f: FailureMessage, _) =>
       logging.error(this, s"failed to create grpc client for ${action} caused 
by: $f")
+      context.parent ! f
+
       self ! ClientClosed
 
       goto(ClientProxyRemoving)
@@ -164,9 +166,12 @@ class ActivationClientProxy(
           stay()
 
         case _: ActionMismatch =>
-          logging.error(this, s"[${containerId.asString}] action version does 
not match: $action")
+          val errorMsg = s"[${containerId.asString}] action version does not 
match: $action"
+          logging.error(this, errorMsg)
           c.activationClient.close().andThen {
-            case _ => self ! ClientClosed
+            case _ =>
+              context.parent ! FailureMessage(new RuntimeException(errorMsg))
+              self ! ClientClosed
           }
 
           goto(ClientProxyRemoving)
@@ -194,6 +199,7 @@ class ActivationClientProxy(
         // it would print huge log due to create another grpcClient to fetch 
activation again.
         case t: StatusRuntimeException if 
t.getMessage.contains(ActivationClientProxy.hostResolveError) =>
           logging.error(this, s"[${containerId.asString}] akka grpc server 
connection failed: $t")
+          context.parent ! FailureMessage(t)
           self ! ClientClosed
 
           goto(ClientProxyRemoving)
@@ -208,14 +214,18 @@ class ActivationClientProxy(
 
           stay()
 
-        case _: ClientClosedException =>
+        case t: ClientClosedException =>
           logging.error(this, s"[${containerId.asString}] grpc client is 
already closed for $action")
+          context.parent ! FailureMessage(t)
+
           self ! ClientClosed
 
           goto(ClientProxyRemoving)
 
         case t: Throwable =>
           logging.error(this, s"[${containerId.asString}] get activation from 
remote server error: $t")
+          context.parent ! FailureMessage(t)
+
           safelyCloseClient(c)
           goto(ClientProxyRemoving)
       }
@@ -372,7 +382,7 @@ class ActivationClientProxy(
           logging.debug(this, s"grpc client is closed for $fqn in the Try 
closure")
           Future.successful(ClientClosed)
       }
-      .getOrElse(Future.failed(new Exception(s"error to get $fqn activation 
from grpc server")))
+      .getOrElse(Future.failed(new RuntimeException(s"error to get $fqn 
activation from grpc server")))
   }
 
   private def createActivationClient(invocationNamespace: String,
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
index 99423581f..988416fc1 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
@@ -272,8 +272,7 @@ class FunctionPullingContainerProxy(
               job.rpcPort,
               container.containerId)) match {
             case Success(clientProxy) =>
-              clientProxy ! StartClient
-              ContainerCreatedData(container, job.invocationNamespace, 
job.action)
+              InitializedData(container, job.invocationNamespace, job.action, 
clientProxy)
             case Failure(t) =>
               logging.error(this, s"failed to create activation client caused 
by: $t")
               ClientCreationFailed(t, container, job.invocationNamespace, 
job.action)
@@ -303,7 +302,7 @@ class FunctionPullingContainerProxy(
   // prewarmed state, container created
   when(ContainerCreated) {
     case Event(job: Initialize, data: PreWarmData) =>
-      Try(
+      val res = Try(
         clientProxyFactory(
           context,
           job.invocationNamespace,
@@ -313,13 +312,15 @@ class FunctionPullingContainerProxy(
           job.rpcPort,
           data.container.containerId)) match {
         case Success(proxy) =>
-          proxy ! StartClient
+          InitializedData(data.container, job.invocationNamespace, job.action, 
proxy)
         case Failure(t) =>
           logging.error(this, s"failed to create activation client for 
${job.action} caused by: $t")
-          self ! ClientCreationFailed(t, data.container, 
job.invocationNamespace, job.action)
+          ClientCreationFailed(t, data.container, job.invocationNamespace, 
job.action)
       }
 
-      goto(CreatingClient) using ContainerCreatedData(data.container, 
job.invocationNamespace, job.action)
+      self ! res
+
+      goto(CreatingClient)
 
     case Event(Remove, data: PreWarmData) =>
       cleanUp(data.container, None, false)
@@ -334,27 +335,19 @@ class FunctionPullingContainerProxy(
 
   when(CreatingClient) {
     // wait for client creation when cold start
-    case Event(job: ContainerCreatedData, _: NonexistentData) =>
-      stay() using job
+    case Event(job: InitializedData, _) =>
+      job.clientProxy ! StartClient
 
-    // wait for container creation when cold start
-    case Event(ClientCreationCompleted(proxy), _: NonexistentData) =>
-      akka.pattern.after(3.milliseconds, actorSystem.scheduler) {
-        self ! ClientCreationCompleted(proxy.orElse(Some(sender())))
-        Future.successful({})
-      }
-
-      stay()
+      stay() using job
 
     // client was successfully obtained
-    case Event(ClientCreationCompleted(proxy), data: ContainerCreatedData) =>
-      val clientProxy = proxy.getOrElse(sender())
+    case Event(ClientCreationCompleted, data: InitializedData) =>
       val fqn = data.action.fullyQualifiedName(true)
       val revision = data.action.rev
       dataManagementService ! RegisterData(
         s"${ContainerKeys.existingContainers(data.invocationNamespace, fqn, 
revision, Some(instance), Some(data.container.containerId))}",
         "")
-      self ! InitializedData(data.container, data.invocationNamespace, 
data.action, clientProxy)
+      self ! data
       goto(ClientCreated)
 
     // client creation failed
@@ -362,13 +355,7 @@ class FunctionPullingContainerProxy(
       invokerHealthManager ! HealthMessage(state = false)
       cleanUp(t.container, t.invocationNamespace, 
t.action.fullyQualifiedName(withVersion = true), t.action.rev, None)
 
-    // there can be a case that client create is failed and a ClientClosed 
will be sent by ActivationClientProxy
-    // wait for container creation when cold start
-    case Event(ClientClosed, _: NonexistentData) =>
-      self ! ClientClosed
-      stay()
-
-    case Event(ClientClosed, data: ContainerCreatedData) =>
+    case Event(ClientClosed, data: InitializedData) =>
       invokerHealthManager ! HealthMessage(state = false)
       cleanUp(
         data.container,
@@ -378,7 +365,7 @@ class FunctionPullingContainerProxy(
         None)
 
     // container creation failed when cold start
-    case Event(t: FailureMessage, _) =>
+    case Event(_: FailureMessage, _) =>
       context.parent ! ContainerRemoved(true)
       stop()
 
@@ -518,6 +505,8 @@ class FunctionPullingContainerProxy(
         data.action.fullyQualifiedName(withVersion = true),
         data.action.rev,
         Some(data.clientProxy))
+
+    case x: Event if x.event != PingCache => delay
   }
 
   when(Running) {
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
index b0a4ad80e..3554e515d 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
@@ -247,7 +247,7 @@ case class HealthActivationServiceClient() extends Actor {
   private var closed: Boolean = false
 
   override def receive: Receive = {
-    case StartClient => sender() ! ClientCreationCompleted()
+    case StartClient => sender() ! ClientCreationCompleted
     case _: RequestActivation =>
       InvokerHealthManager.healthActivation match {
         case Some(activation) if !closed =>
diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
index 19c458fc9..d5dca8bb7 100644
--- 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
@@ -96,7 +96,9 @@ class SchedulingDecisionMaker(
               this,
               s"there is no capacity activations will be dropped or throttled, 
(availableMsg: $availableMsg totalContainers: $totalContainers, limit: $limit, 
namespaceContainers: ${existingContainerCountInNs}, 
namespaceInProgressContainer: ${inProgressContainerCountInNs}) 
[$invocationNamespace:$action]")
             
Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg = 
totalContainers == 0), 0))
-          case NamespaceThrottled if 
schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(limit * 
schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - 
existingContainerCountInNs - inProgressContainerCountInNs > 0 =>
+          case NamespaceThrottled
+              if schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(
+                limit * 
schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - 
existingContainerCountInNs - inProgressContainerCountInNs > 0 =>
             Future.successful(DecisionResults(DisableNamespaceThrottling, 0))
           // do nothing
           case _ =>
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
index 9d33a508b..cf8ef15b2 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.containerpool.v2.test
 
 import akka.Done
 import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
+import akka.actor.Status.Failure
 import akka.actor.{ActorRef, ActorSystem}
 import akka.grpc.internal.ClientClosedException
 import akka.testkit.{ImplicitSender, TestKit, TestProbe}
@@ -103,7 +104,7 @@ class ActivationClientProxyTests
 
     machine ! StartClient
 
-    probe.expectMsg(ClientCreationCompleted())
+    probe.expectMsg(ClientCreationCompleted)
     probe.expectMsg(Transition(machine, ClientProxyUninitialized, 
ClientProxyReady))
   }
 
@@ -124,6 +125,9 @@ class ActivationClientProxyTests
 
     machine ! StartClient
 
+    probe.expectMsgPF() {
+      case Failure(t) => t.getMessage shouldBe "The number of client creation 
retries has been exceeded."
+    }
     probe.expectMsg(Transition(machine, ClientProxyUninitialized, 
ClientProxyRemoving))
     probe.expectMsg(ClientClosed)
 
@@ -208,7 +212,14 @@ class ActivationClientProxyTests
     ready(machine, probe)
 
     machine ! RequestActivation()
-    probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
+
+    inAnyOrder {
+      probe.expectMsg(Transition(machine, ClientProxyReady, 
ClientProxyRemoving))
+      probe.expectMsgPF() {
+        case Failure(t) => t.getMessage.contains(s"action version does not 
match") shouldBe true
+      }
+    }
+
     probe.expectMsg(ClientClosed)
 
     probe expectTerminated machine
@@ -319,7 +330,11 @@ class ActivationClientProxyTests
     ready(machine, probe)
 
     machine ! RequestActivation()
+    probe.expectMsgPF() {
+      case Failure(t) => t.isInstanceOf[ClientClosedException] shouldBe true
+    }
     probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
+
     probe.expectMsg(ClientClosed)
 
     probe expectTerminated machine
@@ -343,6 +358,9 @@ class ActivationClientProxyTests
     ready(machine, probe)
 
     machine ! RequestActivation()
+    probe.expectMsgPF() {
+      case Failure(t) => t.getMessage.contains("Unknown exception") shouldBe 
true
+    }
     probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
     probe.expectMsg(ClientClosed)
 
@@ -426,7 +444,7 @@ class ActivationClientProxyTests
 
   def ready(machine: ActorRef, probe: TestProbe) = {
     machine ! StartClient
-    probe.expectMsg(ClientCreationCompleted())
+    probe.expectMsg(ClientCreationCompleted)
     probe.expectMsg(Transition(machine, ClientProxyUninitialized, 
ClientProxyReady))
   }
 
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
index 3923a9ed3..a3dbae818 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
@@ -444,7 +444,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, transid)
     probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -509,7 +509,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -573,7 +573,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -642,7 +642,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -883,7 +883,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -943,7 +943,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -1015,7 +1015,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -1094,7 +1094,7 @@ class FunctionPullingContainerProxyTests
 
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -1180,7 +1180,7 @@ class FunctionPullingContainerProxyTests
 
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -1265,7 +1265,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -1342,7 +1342,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -1423,7 +1423,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -1500,7 +1500,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -1576,7 +1576,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     awaitAssert {
       machine.underlyingActor.stateData.getContainer should not be None
@@ -1670,7 +1670,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
     dataManagementService.expectMsg(
       RegisterData(
         ContainerKeys.existingContainers(
@@ -1779,7 +1779,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, noLogsAction, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -1910,7 +1910,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -1984,7 +1984,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -2061,7 +2061,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -2140,7 +2140,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -2230,7 +2230,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -2293,7 +2293,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -2359,7 +2359,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -2429,7 +2429,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -2512,7 +2512,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -2589,7 +2589,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -2656,7 +2656,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -2742,7 +2742,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -2822,7 +2822,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, transid)
     probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -2893,7 +2893,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -2955,7 +2955,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
@@ -3021,7 +3021,7 @@ class FunctionPullingContainerProxyTests
     machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
     probe.expectMsg(Transition(machine, ContainerCreated, CreatingClient))
     client.expectMsg(StartClient)
-    client.send(machine, ClientCreationCompleted(Some(client.ref)))
+    client.send(machine, ClientCreationCompleted)
 
     probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
     expectInitialized(probe)
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index d7ec5afd1..37191378c 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -1470,7 +1470,7 @@ class MemoryQueueTests
     fsm ! QueueRemovedCompleted
     parent.expectMsg(10 seconds, Transition(fsm, Removing, Removed))
 
-    probe.expectTerminated(fsm)
+    probe.expectTerminated(fsm, 10 seconds)
   }
 
   it should "throttle the namespace when the limit is already reached" in {

Reply via email to