This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 99fb2c5 Implement ActivationClientProxy (#5119)
99fb2c5 is described below
commit 99fb2c59e264d820d6b95a6319b7882f19aead4d
Author: ningyougang <[email protected]>
AuthorDate: Mon Jun 7 12:51:51 2021 +0800
Implement ActivationClientProxy (#5119)
ActivationClientProxy actor fetches activationMessage from scheudler side
using akka grpc, and forwards the activationMessage to its parent actor
FunctionPullContainerProxy actor.
---
.../apache/openwhisk/common/TransactionId.scala | 2 +
.../containerpool/v2/ActivationClientProxy.scala | 397 +++++++++++++++++-
core/scheduler/src/main/protobuf/activation.proto | 15 +-
.../scheduler/grpc/ActivationServiceImpl.scala | 70 ++--
.../v2/test/ActivationClientProxyTests.scala | 459 +++++++++++++++++++++
.../grpc/test/ActivationServiceImplTests.scala | 85 +++-
6 files changed, 969 insertions(+), 59 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
index d3389b8..de9c8c0 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
@@ -191,6 +191,8 @@ case class TransactionId private (meta:
TransactionMetadata) extends AnyVal {
case Some(parent) => findRoot(parent)
case _ => meta
}
+
+ def serialize = TransactionId.serdes.write(this).compactPrint
}
/**
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
index 71bc5fa..32f33e7 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
@@ -17,15 +17,28 @@
package org.apache.openwhisk.core.containerpool.v2
-import akka.actor.ActorRef
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{ActorRef, ActorSystem, FSM, Props, Stash}
+import akka.grpc.internal.ClientClosedException
+import akka.pattern.pipe
+import akka.stream.ActorMaterializer
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.connector.ActivationMessage
-import org.apache.openwhisk.core.entity.{DocRevision, FullyQualifiedEntityName}
+import org.apache.openwhisk.core.containerpool.ContainerId
+import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
+import org.apache.openwhisk.core.scheduler.grpc.ActivationResponse
+import org.apache.openwhisk.core.scheduler.queue.{ActionMismatch,
MemoryQueueError, NoActivationMessage, NoMemoryQueue}
+import org.apache.openwhisk.grpc.{ActivationServiceClient, FetchRequest,
RescheduleRequest, RescheduleResponse}
+import spray.json.JsonParser.ParsingException
+
+import scala.concurrent.Future
+import scala.util.{Success, Try}
// Event send by the actor
case class ClientCreationCompleted(client: Option[ActorRef] = None)
case object ClientClosed
-case object CloseClientProxy
// Event received by the actor
case object StartClient
@@ -34,10 +47,382 @@ case class RescheduleActivation(invocationNamespace:
String,
fqn: FullyQualifiedEntityName,
rev: DocRevision,
msg: ActivationMessage)
-
case object RetryRequestActivation
case object ContainerWarmed
+case object CloseClientProxy
case object StopClientProxy
-// TODO, use grpc to fetch activation from memoryQueue
-class ActivationClientProxy {}
+// state
+sealed trait ActivationClientProxyState
+case object ClientProxyUninitialized extends ActivationClientProxyState
+case object ClientProxyReady extends ActivationClientProxyState
+case object ClientProxyRemoving extends ActivationClientProxyState
+
+// data
+sealed trait ActivationClientProxyData
+case class Client(activationClient: ActivationServiceClient, rpcHost: String,
rpcPort: Int)
+ extends ActivationClientProxyData
+case class Retry(count: Int) extends ActivationClientProxyData
+
+class ActivationClientProxy(
+ invocationNamespace: String,
+ action: FullyQualifiedEntityName,
+ rev: DocRevision,
+ schedulerHost: String,
+ rpcPort: Int,
+ containerId: ContainerId,
+ activationClientFactory: (String, FullyQualifiedEntityName, String, Int,
Boolean) => Future[ActivationServiceClient])(
+ implicit actorSystem: ActorSystem,
+ mat: ActorMaterializer,
+ logging: Logging)
+ extends FSM[ActivationClientProxyState, ActivationClientProxyData]
+ with Stash {
+
+ implicit val ec = actorSystem.dispatcher
+
+ private var warmed = false
+
+ startWith(ClientProxyUninitialized, Retry(3))
+
+ when(ClientProxyUninitialized) {
+ case Event(StartClient, r: Retry) =>
+ // build activation client using original scheduler endpoint firstly
+ createActivationClient(invocationNamespace, action, schedulerHost,
rpcPort, tryOtherScheduler = false)
+ .pipeTo(self)
+
+ stay using r
+
+ case Event(client: ActivationClient, _) =>
+ context.parent ! ClientCreationCompleted()
+
+ goto(ClientProxyReady) using Client(client.client, client.rpcHost,
client.rpcPort)
+
+ case Event(f: FailureMessage, _) =>
+ logging.error(this, s"failed to create grpc client for ${action} caused
by: $f")
+ self ! ClientClosed
+
+ goto(ClientProxyRemoving)
+
+ case _ => delay
+ }
+
+ when(ClientProxyReady) {
+ case Event(request: RequestActivation, client: Client) =>
+ request.newScheduler match {
+ // if scheduler is changed, client needs to be recreated
+ case Some(scheduler) if scheduler.host != client.rpcHost ||
scheduler.rpcPort != client.rpcPort =>
+ val newHost = request.newScheduler.get.host
+ val newPort = request.newScheduler.get.rpcPort
+ client.activationClient
+ .close()
+ .flatMap(_ =>
+ createActivationClient(invocationNamespace, action, newHost,
newPort, tryOtherScheduler = false))
+ .pipeTo(self)
+
+ case _ =>
+ requestActivationMessage(invocationNamespace, action, rev,
client.activationClient, request.lastDuration)
+ .pipeTo(self)
+ }
+ stay()
+
+ case Event(e: RescheduleActivation, client: Client) =>
+ logging.info(this, s"got a reschedule message ${e.msg.activationId} for
action: ${e.msg.action}")
+ client.activationClient
+ .rescheduleActivation(
+ RescheduleRequest(e.invocationNamespace, e.fqn.serialize,
e.rev.serialize, e.msg.serialize))
+ .recover {
+ case t =>
+ logging.error(this, s"Failed to reschedule activation (error: $t)")
+ Future.successful(RescheduleResponse())
+ }
+ .foreach(res => {
+ context.parent ! res
+ })
+ stay()
+
+ case Event(msg: ActivationMessage, _: Client) =>
+ logging.debug(this, s"got a message ${msg.activationId} for action:
${msg.action}")
+ context.parent ! msg
+
+ stay()
+
+ /**
+ * Case of scheduler error
+ */
+ case Event(error: MemoryQueueError, c: Client) =>
+ error match {
+ case _: NoMemoryQueue =>
+ logging.error(
+ this,
+ s"The queue of action ${action} under invocationNamespace
${invocationNamespace} does not exist. Check for queues in other schedulers.")
+ c.activationClient
+ .close()
+ .flatMap(_ =>
+ createActivationClient(invocationNamespace, action, c.rpcHost,
c.rpcPort, tryOtherScheduler = true))
+ .pipeTo(self)
+
+ stay()
+
+ case _: ActionMismatch =>
+ logging.error(this, s"action version does not match: $action")
+ c.activationClient.close().andThen {
+ case _ => self ! ClientClosed
+ }
+
+ goto(ClientProxyRemoving)
+
+ case _: NoActivationMessage => // retry
+ logging.debug(this, s"no activation message exist: $action")
+ context.parent ! RetryRequestActivation
+
+ stay()
+ }
+
+ /**
+ * Case of system error like grpc, parsing message
+ */
+ case Event(f: FailureMessage, c: Client) =>
+ f.cause match {
+ case t: ParsingException =>
+ logging.error(this, s"failed to parse activation message: $t")
+ context.parent ! RetryRequestActivation
+
+ stay()
+
+ // When scheduler pod recreated, the StatusRuntimeException with
`Unable to resolve host` would happen.
+ // In such situation, it is better to stop the activationClientProxy,
otherwise, in short time,
+ // it would print huge log due to create another grpcClient to fetch
activation again.
+ case t: StatusRuntimeException if
t.getMessage.contains(ActivationClientProxy.hostResolveError) =>
+ logging.error(this, s"akka grpc server connection failed: $t")
+ self ! ClientClosed
+
+ goto(ClientProxyRemoving)
+
+ case t: StatusRuntimeException =>
+ logging.error(this, s"akka grpc server connection failed: $t")
+ c.activationClient
+ .close()
+ .flatMap(_ =>
+ createActivationClient(invocationNamespace, action, c.rpcHost,
c.rpcPort, tryOtherScheduler = true))
+ .pipeTo(self)
+
+ stay()
+
+ case _: ClientClosedException =>
+ logging.error(this, s"grpc client is already closed for $action")
+ self ! ClientClosed
+
+ goto(ClientProxyRemoving)
+
+ case t: Throwable =>
+ logging.error(this, s"get activation from remote server error: $t")
+ safelyCloseClient(c)
+ goto(ClientProxyRemoving)
+ }
+
+ case Event(client: ActivationClient, _) =>
+ // long poll
+ requestActivationMessage(invocationNamespace, action, rev, client.client)
+ .pipeTo(self)
+
+ stay using Client(client.client, client.rpcHost, client.rpcPort)
+ }
+
+ when(ClientProxyRemoving) {
+ case Event(request: RequestActivation, client: Client) =>
+ request.newScheduler match {
+ // if scheduler is changed, client needs to be recreated
+ case Some(scheduler) if scheduler.host != client.rpcHost ||
scheduler.rpcPort != client.rpcPort =>
+ val newHost = request.newScheduler.get.host
+ val newPort = request.newScheduler.get.rpcPort
+ client.activationClient
+ .close()
+ .flatMap(_ =>
+ createActivationClient(invocationNamespace, action, newHost,
newPort, tryOtherScheduler = false))
+ .pipeTo(self)
+
+ case _ =>
+ requestActivationMessage(invocationNamespace, action, rev,
client.activationClient, request.lastDuration)
+ .pipeTo(self)
+ }
+ stay()
+
+ case Event(msg: ActivationMessage, _: Client) =>
+ context.parent ! msg
+
+ stay()
+
+ case Event(_: MemoryQueueError, _: Client) =>
+ self ! ClientClosed
+
+ stay()
+
+ case Event(f: FailureMessage, c: Client) =>
+ logging.error(this, s"some error happened for action: ${action} in
state: $stateName, caused by: $f")
+ safelyCloseClient(c)
+ stay()
+
+ case Event(client: ActivationClient, _) =>
+ // long poll
+ requestActivationMessage(invocationNamespace, action, rev, client.client)
+ .pipeTo(self)
+
+ stay using Client(client.client, client.rpcHost, client.rpcPort)
+ }
+
+ // Unstash all messages stashed while in intermediate state
+ onTransition {
+ case _ -> ClientProxyReady => unstashAll()
+ case _ -> ClientProxyRemoving => unstashAll()
+ }
+
+ whenUnhandled {
+ case Event(ContainerWarmed, _) =>
+ warmed = true
+ stay
+
+ case Event(CloseClientProxy, c: Client) =>
+ safelyCloseClient(c)
+ goto(ClientProxyRemoving)
+
+ case Event(ClientClosed, _) =>
+ context.parent ! ClientClosed
+
+ stop()
+
+ case Event(StopClientProxy, c: Client) =>
+ safelyCloseClient(c)
+ stay()
+ }
+
+ initialize()
+
+ /** Delays all incoming messages until unstashAll() is called */
+ def delay = {
+ stash()
+ stay
+ }
+
+ /**
+ * Safely shut down the client.
+ */
+ private def safelyCloseClient(client: Client): Unit = {
+ Try {
+ client.activationClient
+ .fetchActivation(
+ FetchRequest(
+ TransactionId(TransactionId.generateTid()).serialize,
+ invocationNamespace,
+ action.serialize,
+ rev.serialize,
+ containerId.asString,
+ warmed,
+ None,
+ false))
+ .andThen {
+ case _ =>
+ client.activationClient.close().andThen {
+ case _ => self ! ClientClosed
+ }
+ }
+ }.recover {
+ // If the fetchActivation is executed when the client is closed, the
andThen statement is not executed.
+ case _: ClientClosedException =>
+ self ! ClientClosed
+ }
+ }
+
+ /**
+ * Request activation message to scheduler by long poll
+ *
+ * @return ActivationMessage or MemoryQueueError
+ */
+ private def requestActivationMessage(invocationNamespace: String,
+ fqn: FullyQualifiedEntityName,
+ rev: DocRevision,
+ client: ActivationServiceClient,
+ lastDuration: Option[Long] = None) = {
+ Try {
+ client
+ .fetchActivation(
+ FetchRequest(
+ TransactionId(TransactionId.generateTid()).serialize,
+ invocationNamespace,
+ fqn.serialize,
+ rev.serialize,
+ containerId.asString,
+ warmed,
+ lastDuration,
+ true))
+ .flatMap { r =>
+ Future(ActivationResponse.parse(r.activationMessage))
+ .flatMap(Future.fromTry)
+ .flatMap {
+ case ActivationResponse(Right(msg)) =>
+ Future.successful(msg)
+ case ActivationResponse(Left(msg)) =>
+ Future.successful(msg)
+ }
+ }
+ }.recover {
+ case _: ClientClosedException =>
+ logging.debug(this, s"grpc client is closed for $fqn in the Try
closure")
+ Future.successful(ClientClosed)
+ }
+ .getOrElse(Future.failed(new Exception(s"error to get $fqn activation
from grpc server")))
+ }
+
+ private def createActivationClient(invocationNamespace: String,
+ fqn: FullyQualifiedEntityName,
+ schedulerHost: String,
+ rpcPort: Int,
+ tryOtherScheduler: Boolean,
+ retry: Int = 5): Future[ActivationClient]
= {
+ activationClientFactory(invocationNamespace, fqn, schedulerHost, rpcPort,
tryOtherScheduler)
+ .map { client =>
+ ActivationClient(client, schedulerHost, rpcPort)
+ }
+ .andThen {
+ case Success(_) => logging.debug(this, "The gRPC client created
successfully")
+ }
+ .recoverWith {
+ case _: Throwable =>
+ if (retry < 5)
+ createActivationClient(invocationNamespace, action, schedulerHost,
rpcPort, tryOtherScheduler, retry - 1)
+ else {
+ Future.failed(new Exception("The number of client creation retries
has been exceeded."))
+ }
+ }
+ }
+}
+
+object ActivationClientProxy {
+
+ val hostResolveError = "Unable to resolve host"
+
+ def props(invocationNamespace: String,
+ action: FullyQualifiedEntityName,
+ rev: DocRevision,
+ schedulerHost: String,
+ rpcPort: Int,
+ containerId: ContainerId,
+ activationClientFactory: (String,
+ FullyQualifiedEntityName,
+ String,
+ Int,
+ Boolean) =>
Future[ActivationServiceClient])(implicit actorSystem: ActorSystem,
+
mat: ActorMaterializer,
+
logging: Logging) = {
+ Props(
+ new ActivationClientProxy(
+ invocationNamespace,
+ action,
+ rev,
+ schedulerHost,
+ rpcPort,
+ containerId,
+ activationClientFactory))
+ }
+}
+
+case class ActivationClient(client: ActivationServiceClient, rpcHost: String,
rpcPort: Int)
diff --git a/core/scheduler/src/main/protobuf/activation.proto
b/core/scheduler/src/main/protobuf/activation.proto
index fb16f48..e79dfda 100644
--- a/core/scheduler/src/main/protobuf/activation.proto
+++ b/core/scheduler/src/main/protobuf/activation.proto
@@ -36,15 +36,16 @@ service ActivationService {
//#messages
// The request message
message FetchRequest {
- string invocationNamespace = 1;
- string fqn = 2;
- string rev = 3;
- string containerId = 4;
- bool warmed = 5;
+ string transactionId = 1;
+ string invocationNamespace = 2;
+ string fqn = 3;
+ string rev = 4;
+ string containerId = 5;
+ bool warmed = 6;
// This allows optional value
- google.protobuf.Int64Value lastDuration = 6;
+ google.protobuf.Int64Value lastDuration = 7;
// to record alive containers
- bool alive = 7;
+ bool alive = 8;
}
// The response message
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
index d80cd42..acf311e 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala
@@ -20,7 +20,8 @@ package org.apache.openwhisk.core.scheduler.grpc
import akka.actor.ActorSystem
import akka.pattern.ask
import akka.util.Timeout
-import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.core.WarmUp
import org.apache.openwhisk.core.connector.{ActivationMessage, Message}
import org.apache.openwhisk.core.entity.{DocRevision, FullyQualifiedEntityName}
import org.apache.openwhisk.core.scheduler.queue._
@@ -51,7 +52,7 @@ class ActivationServiceImpl()(implicit actorSystem:
ActorSystem, logging: Loggin
this,
s"Enqueue activation message to reschedule
${request.invocationNamespace} ${request.fqn} ${request.rev}")
queueValue.queue ? res._3
- Future.successful(RescheduleResponse(isRescheduled = true))
+ Future.successful(RescheduleResponse(true))
case None =>
logging.error(this, s"Queue not found for
${request.invocationNamespace} ${request.fqn} ${request.rev}")
Future.successful(RescheduleResponse())
@@ -65,31 +66,43 @@ class ActivationServiceImpl()(implicit actorSystem:
ActorSystem, logging: Loggin
fqn <- FullyQualifiedEntityName.parse(request.fqn)
rev <- DocRevision.parse(request.rev)
} yield (fqn, rev)).flatMap(Future.fromTry) flatMap { res =>
- val key = res._1.toDocId.asDocInfo(res._2)
- QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match {
- case Some(queueValue) =>
- (queueValue.queue ? GetActivation(
- res._1,
- request.containerId,
- request.warmed,
- request.lastDuration,
- request.alive))
- .mapTo[ActivationResponse]
- .map { response =>
- FetchResponse(response.serialize)
- }
- .recover {
- case t: Throwable =>
- logging.error(this, s"Failed to get message from QueueManager,
error: ${t.getMessage}")
-
FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize)
- }
- case None =>
- if (QueuePool.keys.exists { mkey =>
- mkey.invocationNamespace == request.invocationNamespace &&
mkey.docInfo.id == key.id
- })
-
Future.successful(FetchResponse(ActivationResponse(Left(ActionMismatch())).serialize))
- else
-
Future.successful(FetchResponse(ActivationResponse(Left(NoMemoryQueue())).serialize))
+ val (fqn, rev) = res
+ if (!WarmUp.isWarmUpAction(fqn)) {
+ val key = fqn.toDocId.asDocInfo(rev)
+ QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match {
+ case Some(queueValue) =>
+ implicit val transid =
TransactionId.serdes.read(request.transactionId.parseJson)
+ (queueValue.queue ? GetActivation(
+ transid,
+ fqn,
+ request.containerId,
+ request.warmed,
+ request.lastDuration,
+ request.alive))
+ .mapTo[ActivationResponse]
+ .map { response =>
+ FetchResponse(response.serialize)
+ }
+ .recover {
+ case t: Throwable =>
+ logging.error(
+ this,
+ s"Failed to get message from QueueManager container:
${request.containerId}, fqn: ${request.fqn}, rev: ${request.rev}, alive:
${request.alive}, lastDuration: ${request.lastDuration}, error:
${t.getMessage}")
+
FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize)
+ }
+ case None =>
+ if (QueuePool.keys.exists { mkey =>
+ mkey.invocationNamespace == request.invocationNamespace &&
mkey.docInfo.id == key.id
+ })
+
Future.successful(FetchResponse(ActivationResponse(Left(ActionMismatch())).serialize))
+ else
+
Future.successful(FetchResponse(ActivationResponse(Left(NoMemoryQueue())).serialize))
+ }
+ } else {
+ logging.info(
+ this,
+ s"The ${request.fqn} action is an action used to connect a network
level connection. So response no activation")
+
Future.successful(FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize))
}
}
}
@@ -101,7 +114,8 @@ object ActivationServiceImpl {
new ActivationServiceImpl()
}
-case class GetActivation(action: FullyQualifiedEntityName,
+case class GetActivation(transactionId: TransactionId,
+ action: FullyQualifiedEntityName,
containerId: String,
warmed: Boolean,
lastDuration: Option[Long],
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
new file mode 100644
index 0000000..737aa5d
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/ActivationClientProxyTests.scala
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2.test
+
+import akka.Done
+import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
+import akka.actor.{ActorRef, ActorSystem}
+import akka.grpc.internal.ClientClosedException
+import akka.stream.ActorMaterializer
+import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import common.StreamLogging
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.connector.ActivationMessage
+import org.apache.openwhisk.core.containerpool.ContainerId
+import org.apache.openwhisk.core.containerpool.v2._
+import org.apache.openwhisk.core.entity.ExecManifest.{ImageName,
RuntimeManifest}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
+import org.apache.openwhisk.core.scheduler.grpc.{ActivationResponse =>
AResponse}
+import org.apache.openwhisk.core.scheduler.queue.{ActionMismatch,
NoActivationMessage, NoMemoryQueue}
+import org.apache.openwhisk.grpc
+import org.apache.openwhisk.grpc.{ActivationServiceClient, FetchRequest,
RescheduleRequest, RescheduleResponse}
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
+import org.scalatest.concurrent.ScalaFutures
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class ActivationClientProxyTests
+ extends TestKit(ActorSystem("ActivationClientProxy"))
+ with ImplicitSender
+ with FlatSpecLike
+ with Matchers
+ with MockFactory
+ with BeforeAndAfterAll
+ with StreamLogging
+ with ScalaFutures {
+
+ override def afterAll: Unit = TestKit.shutdownActorSystem(system)
+
+ implicit val mat = ActorMaterializer()
+ implicit val ec = system.dispatcher
+
+ val timeout = 20.seconds
+
+ val log = logging
+
+ val exec = CodeExecAsString(RuntimeManifest("actionKind",
ImageName("testImage")), "testCode", None)
+ val action = ExecutableWhiskAction(EntityPath("actionSpace"),
EntityName("actionName"), exec)
+ val fqn = action.fullyQualifiedName(true)
+ val rev = action.rev
+ val schedulerHost = "127.17.0.1"
+ val rpcPort = 13001
+ val containerId = ContainerId("fakeContainerId")
+ val messageTransId = TransactionId(TransactionId.testing.meta.id)
+ val invocationNamespace = EntityName("invocationSpace")
+ val uuid = UUID()
+
+ val message = ActivationMessage(
+ messageTransId,
+ action.fullyQualifiedName(true),
+ action.rev,
+ Identity(Subject(), Namespace(invocationNamespace, uuid),
BasicAuthenticationAuthKey(uuid, Secret()), Set.empty),
+ ActivationId.generate(),
+ ControllerInstanceId("0"),
+ blocking = false,
+ content = None)
+
+ val entityStore = WhiskEntityStore.datastore()
+
+ behavior of "ActivationClientProxy"
+
+ it should "create a grpc client successfully" in within(timeout) {
+ val fetch = (_: FetchRequest) =>
Future(grpc.FetchResponse(AResponse(Right(message)).serialize))
+ val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int,
_: Boolean) =>
+ Future(MockActivationServiceClient(fetch))
+
+ val probe = TestProbe()
+ val machine =
+ probe.childActorOf(
+ ActivationClientProxy
+ .props(invocationNamespace.asString, fqn, rev, schedulerHost,
rpcPort, containerId, client))
+ registerCallback(machine, probe)
+
+ machine ! StartClient
+
+ probe.expectMsg(ClientCreationCompleted())
+ probe.expectMsg(Transition(machine, ClientProxyUninitialized,
ClientProxyReady))
+ }
+
+ it should "be closed when failed to create grpc client" in within(timeout) {
+ val fetch = (_: FetchRequest) =>
Future(grpc.FetchResponse(AResponse(Right(message)).serialize))
+ val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int,
_: Boolean) =>
+ Future {
+ throw new RuntimeException("failed to create client")
+ MockActivationServiceClient(fetch)
+ }
+
+ val probe = TestProbe()
+ val machine =
+ probe.childActorOf(
+ ActivationClientProxy
+ .props(invocationNamespace.asString, fqn, rev, schedulerHost,
rpcPort, containerId, client))
+ registerCallback(machine, probe)
+
+ machine ! StartClient
+
+ probe.expectMsg(Transition(machine, ClientProxyUninitialized,
ClientProxyRemoving))
+ probe.expectMsg(ClientClosed)
+
+ probe expectTerminated machine
+ }
+
+ it should "fetch activation message successfully" in within(timeout) {
+ val fetch = (_: FetchRequest) =>
Future(grpc.FetchResponse(AResponse(Right(message)).serialize))
+ val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int,
_: Boolean) =>
+ Future(MockActivationServiceClient(fetch))
+
+ val probe = TestProbe()
+ val machine =
+ probe.childActorOf(
+ ActivationClientProxy
+ .props(invocationNamespace.asString, fqn, rev, schedulerHost,
rpcPort, containerId, client))
+ registerCallback(machine, probe)
+ ready(machine, probe)
+
+ machine ! RequestActivation()
+ probe.expectMsg(message)
+ }
+
+ it should "be recreated when scheduler is changed" in within(timeout) {
+ var creationCount = 0
+ val fetch = (_: FetchRequest) =>
Future(grpc.FetchResponse(AResponse(Left(NoMemoryQueue())).serialize))
+ val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int,
_: Boolean) => {
+ creationCount += 1
+ Future(MockActivationServiceClient(fetch))
+ }
+
+ val probe = TestProbe()
+ val machine =
+ probe.childActorOf(
+ ActivationClientProxy
+ .props(invocationNamespace.asString, fqn, rev, schedulerHost,
rpcPort, containerId, client))
+ registerCallback(machine, probe)
+ ready(machine, probe)
+
+ // new scheduler is reached
+ machine ! RequestActivation(newScheduler =
Some(SchedulerEndpoints("0.0.0.0", 10, 11)))
+
+ awaitAssert {
+ creationCount should be > 1
+ }
+ }
+
+ it should "be recreated when the queue does not exist" in within(timeout) {
+ var creationCount = 0
+ val fetch = (_: FetchRequest) =>
Future(grpc.FetchResponse(AResponse(Left(NoMemoryQueue())).serialize))
+ val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int,
_: Boolean) => {
+ creationCount += 1
+ Future(MockActivationServiceClient(fetch))
+ }
+
+ val probe = TestProbe()
+ val machine =
+ probe.childActorOf(
+ ActivationClientProxy
+ .props(invocationNamespace.asString, fqn, rev, schedulerHost,
rpcPort, containerId, client))
+ registerCallback(machine, probe)
+ ready(machine, probe)
+
+ machine ! RequestActivation()
+
+ awaitAssert {
+ creationCount should be > 1
+ }
+ }
+
+ it should "be closed when the action version does not match" in
within(timeout) {
+ val fetch = (_: FetchRequest) =>
Future(grpc.FetchResponse(AResponse(Left(ActionMismatch())).serialize))
+ val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int,
_: Boolean) =>
+ Future(MockActivationServiceClient(fetch))
+
+ val probe = TestProbe()
+ val machine =
+ probe.childActorOf(
+ ActivationClientProxy
+ .props(invocationNamespace.asString, fqn, rev, schedulerHost,
rpcPort, containerId, client))
+ registerCallback(machine, probe)
+ ready(machine, probe)
+
+ machine ! RequestActivation()
+ probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
+ probe.expectMsg(ClientClosed)
+
+ probe expectTerminated machine
+ }
+
+ it should "retry to request activation message when scheduler response no
activation message" in within(timeout) {
+ val fetch = (_: FetchRequest) =>
Future(grpc.FetchResponse(AResponse(Left(NoActivationMessage())).serialize))
+ val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int,
_: Boolean) =>
+ Future(MockActivationServiceClient(fetch))
+
+ val probe = TestProbe()
+ val machine =
+ probe.childActorOf(
+ ActivationClientProxy
+ .props(invocationNamespace.asString, fqn, rev, schedulerHost,
rpcPort, containerId, client))
+ registerCallback(machine, probe)
+ ready(machine, probe)
+
+ machine ! RequestActivation()
+ probe.expectMsg(RetryRequestActivation)
+ }
+
+ it should "create activation client on other scheduler when the queue does
not exist" in within(timeout) {
+ val createClientOnOtherScheduler = new ArrayBuffer[Boolean]()
+ val fetch = (_: FetchRequest) =>
Future(grpc.FetchResponse(AResponse(Left(NoMemoryQueue())).serialize))
+ val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int,
tryOtherScheduler: Boolean) => {
+ createClientOnOtherScheduler += tryOtherScheduler
+ Future(MockActivationServiceClient(fetch))
+ }
+
+ val probe = TestProbe()
+ val machine =
+ probe.childActorOf(
+ ActivationClientProxy
+ .props(invocationNamespace.asString, fqn, rev, schedulerHost,
rpcPort, containerId, client))
+ registerCallback(machine, probe)
+ ready(machine, probe)
+
+ machine ! RequestActivation()
+
+ awaitAssert {
+ // Create activation client using original scheduler endpoint firstly
+ createClientOnOtherScheduler(0) shouldBe false
+ // Create activation client using latest scheduler endpoint(try other
scheduler) when no memoryQueue
+ createClientOnOtherScheduler(1) shouldBe true
+ }
+ }
+
+ it should "request activation message when the message can't deserialize" in
within(timeout) {
+ val fetch = (_: FetchRequest) => Future(grpc.FetchResponse("aaaaaa"))
+ val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int,
_: Boolean) =>
+ Future(MockActivationServiceClient(fetch))
+
+ val probe = TestProbe()
+ val machine =
+ probe.childActorOf(
+ ActivationClientProxy
+ .props(invocationNamespace.asString, fqn, rev, schedulerHost,
rpcPort, containerId, client))
+ registerCallback(machine, probe)
+ ready(machine, probe)
+
+ machine ! RequestActivation()
+ probe.expectMsg(RetryRequestActivation)
+ }
+
+ it should "be recreated when akka grpc server connection failed" in
within(timeout) {
+ var creationCount = 0
+ val fetch = (_: FetchRequest) =>
+ Future {
+ throw new StatusRuntimeException(io.grpc.Status.UNAVAILABLE)
+ grpc.FetchResponse(AResponse(Right(message)).serialize)
+ }
+ val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int,
_: Boolean) => {
+ creationCount += 1
+ Future(MockActivationServiceClient(fetch))
+ }
+
+ val probe = TestProbe()
+ val machine =
+ probe.childActorOf(
+ ActivationClientProxy
+ .props(invocationNamespace.asString, fqn, rev, schedulerHost,
rpcPort, containerId, client))
+ registerCallback(machine, probe)
+ ready(machine, probe)
+
+ machine ! RequestActivation()
+
+ awaitAssert {
+ creationCount should be > 1
+ }
+ }
+
+ it should "be closed when grpc client is already closed" in within(timeout) {
+ val fetch = (_: FetchRequest) =>
+ Future {
+ throw new ClientClosedException()
+ grpc.FetchResponse(AResponse(Right(message)).serialize)
+ }
+ val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int,
_: Boolean) =>
+ Future(MockActivationServiceClient(fetch))
+
+ val probe = TestProbe()
+ val machine =
+ probe.childActorOf(
+ ActivationClientProxy
+ .props(invocationNamespace.asString, fqn, rev, schedulerHost,
rpcPort, containerId, client))
+ registerCallback(machine, probe)
+ ready(machine, probe)
+
+ machine ! RequestActivation()
+ probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
+ probe.expectMsg(ClientClosed)
+
+ probe expectTerminated machine
+ }
+
+ it should "be closed when it failed to getting activation from scheduler" in
within(timeout) {
+ val fetch = (_: FetchRequest) =>
+ Future {
+ throw new Exception("Unknown exception")
+ grpc.FetchResponse(AResponse(Right(message)).serialize)
+ }
+ val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int,
_: Boolean) =>
+ Future(MockActivationServiceClient(fetch))
+
+ val probe = TestProbe()
+ val machine =
+ probe.childActorOf(
+ ActivationClientProxy
+ .props(invocationNamespace.asString, fqn, rev, schedulerHost,
rpcPort, containerId, client))
+ registerCallback(machine, probe)
+ ready(machine, probe)
+
+ machine ! RequestActivation()
+ probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
+ probe.expectMsg(ClientClosed)
+
+ probe expectTerminated machine
+ }
+
+ it should "be closed when it receives a CloseClientProxy message for a
normal timeout case" in within(timeout) {
+ val fetch = (_: FetchRequest) =>
Future(grpc.FetchResponse(AResponse(Right(message)).serialize))
+ val activationClient = MockActivationServiceClient(fetch)
+ val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int,
_: Boolean) => Future(activationClient)
+
+ val probe = TestProbe()
+ val machine =
+ probe.childActorOf(
+ ActivationClientProxy
+ .props(invocationNamespace.asString, fqn, rev, schedulerHost,
rpcPort, containerId, client))
+ registerCallback(machine, probe)
+ ready(machine, probe)
+
+ machine ! CloseClientProxy
+ awaitAssert(activationClient.isClosed shouldBe true)
+
+ probe.expectMsg(Transition(machine, ClientProxyReady, ClientProxyRemoving))
+
+ machine ! RequestActivation()
+
+ probe expectMsg ClientClosed
+ probe expectTerminated machine
+ }
+
+ it should "be closed when it receives a StopClientProxy message for the case
of graceful shutdown" in within(timeout) {
+ val fetch = (_: FetchRequest) =>
Future(grpc.FetchResponse(AResponse(Right(message)).serialize))
+ val activationClient = MockActivationServiceClient(fetch)
+ val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int,
_: Boolean) => Future(activationClient)
+
+ val probe = TestProbe()
+ val machine =
+ probe.childActorOf(
+ ActivationClientProxy
+ .props(invocationNamespace.asString, fqn, rev, schedulerHost,
rpcPort, containerId, client))
+ registerCallback(machine, probe)
+ ready(machine, probe)
+
+ machine ! StopClientProxy
+ awaitAssert(activationClient.isClosed shouldBe true)
+
+ probe expectMsg ClientClosed
+ probe expectTerminated machine
+ }
+
+ it should "be safely closed when the client is already closed" in
within(timeout) {
+ val fetch = (_: FetchRequest) =>
Future(grpc.FetchResponse(AResponse(Right(message)).serialize))
+ val activationClient = MockActivationServiceClient(fetch)
+ val client = (_: String, _: FullyQualifiedEntityName, _: String, _: Int,
_: Boolean) => Future(activationClient)
+
+ val probe = TestProbe()
+ val machine =
+ probe.childActorOf(
+ ActivationClientProxy
+ .props(invocationNamespace.asString, fqn, rev, schedulerHost,
rpcPort, containerId, client))
+ registerCallback(machine, probe)
+ ready(machine, probe)
+
+ // close client
+ activationClient.close().futureValue
+ awaitAssert(activationClient.isClosed shouldBe true)
+
+ // close client again
+ machine ! StopClientProxy
+
+ probe expectMsg ClientClosed
+ probe expectTerminated machine
+ }
+
+ /** Registers the transition callback and expects the first message */
+ def registerCallback(c: ActorRef, probe: TestProbe) = {
+ c ! SubscribeTransitionCallBack(probe.ref)
+ probe.expectMsg(CurrentState(c, ClientProxyUninitialized))
+ probe watch c
+ }
+
+ def ready(machine: ActorRef, probe: TestProbe) = {
+ machine ! StartClient
+ probe.expectMsg(ClientCreationCompleted())
+ probe.expectMsg(Transition(machine, ClientProxyUninitialized,
ClientProxyReady))
+ }
+
+ case class MockActivationServiceClient(customFetchActivation: FetchRequest
=> Future[grpc.FetchResponse])
+ extends ActivationServiceClient {
+
+ var isClosed = false
+
+ override def close(): Future[Done] = {
+ isClosed = true
+ Future.successful(Done)
+ }
+
+ override def closed(): Future[Done] = close()
+
+ override def rescheduleActivation(in: RescheduleRequest):
Future[RescheduleResponse] = {
+ Future.successful(RescheduleResponse())
+ }
+
+ override def fetchActivation(in: FetchRequest): Future[grpc.FetchResponse]
= {
+ if (!isClosed) {
+ customFetchActivation(in)
+ } else {
+ throw new ClientClosedException()
+ }
+ }
+ }
+}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
index 16c6f1c..b3838d9 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala
@@ -21,6 +21,7 @@ import akka.actor.{Actor, ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestKit}
import common.StreamLogging
import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.WarmUp.warmUpAction
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.scheduler.grpc.ActivationServiceImpl
@@ -28,16 +29,19 @@ import org.apache.openwhisk.core.scheduler.queue.{
ActionMismatch,
MemoryQueueKey,
MemoryQueueValue,
+ NoActivationMessage,
NoMemoryQueue,
QueuePool
}
-import org.apache.openwhisk.grpc.{FetchRequest, FetchResponse,
RescheduleRequest, RescheduleResponse}
+import org.apache.openwhisk.grpc.{FetchRequest, FetchResponse}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike,
Matchers}
import org.apache.openwhisk.core.scheduler.grpc.{ActivationResponse,
GetActivation}
import org.scalatest.concurrent.ScalaFutures
+import spray.json.JsonParser.ParsingException
+import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
@RunWith(classOf[JUnitRunner])
@@ -58,6 +62,8 @@ class ActivationServiceImplTests
}
override def beforeEach = QueuePool.clear()
+ private def await[T](awaitable: Future[T], timeout: FiniteDuration =
10.seconds) = Await.result(awaitable, timeout)
+
behavior of "ActivationService"
implicit val ec = system.dispatcher
@@ -80,7 +86,7 @@ class ActivationServiceImplTests
blocking = false,
content = None)
- it should "send GetActivation message to the MemoryQueue actor" in {
+ it should "delegate the FetchRequest to a MemoryQueue" in {
val mock = system.actorOf(Props(new Actor() {
override def receive: Receive = {
@@ -93,9 +99,11 @@ class ActivationServiceImplTests
QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc),
MemoryQueueValue(mock, true))
val activationServiceImpl = ActivationServiceImpl()
+ val tid = TransactionId(TransactionId.generateTid())
activationServiceImpl
.fetchActivation(
FetchRequest(
+ tid.serialize,
message.user.namespace.name.asString,
testFQN.serialize,
testDocRevision.serialize,
@@ -104,15 +112,16 @@ class ActivationServiceImplTests
alive = true))
.futureValue shouldBe
FetchResponse(ActivationResponse(Right(message)).serialize)
- expectMsg(GetActivation(testFQN, testContainerId, false, None))
+ expectMsg(GetActivation(tid, testFQN, testContainerId, false, None))
}
- it should "return NoMemoryQueue if there is no queue" in {
+ it should "return without any retry if there is no such queue" in {
val activationServiceImpl = ActivationServiceImpl()
activationServiceImpl
.fetchActivation(
FetchRequest(
+ TransactionId(TransactionId.generateTid()).serialize,
message.user.namespace.name.asString,
testFQN.serialize,
testDocRevision.serialize,
@@ -133,6 +142,7 @@ class ActivationServiceImplTests
activationServiceImpl
.fetchActivation(
FetchRequest( // same doc id but with a different doc revision
+ TransactionId(TransactionId.generateTid()).serialize,
message.user.namespace.name.asString,
testFQN.serialize,
DocRevision("new-one").serialize,
@@ -144,28 +154,67 @@ class ActivationServiceImplTests
expectNoMessage(200.millis)
}
- it should "reschedule activation message to the queue" in {
+ it should "return NoActivationMessage if it is a warm-up action" in {
- val mock = system.actorOf(Props(new Actor() {
- override def receive: Receive = {
- case message: ActivationMessage =>
- testActor ! message
- }
- }))
val activationServiceImpl = ActivationServiceImpl()
- QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc),
MemoryQueueValue(mock, true))
+ QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc),
MemoryQueueValue(testActor, true))
activationServiceImpl
- .rescheduleActivation(
- RescheduleRequest( // same doc id but with a different doc revision
+ .fetchActivation(
+ FetchRequest(
+ TransactionId(TransactionId.generateTid()).serialize,
message.user.namespace.name.asString,
- testFQN.serialize,
+ warmUpAction.serialize,
testDocRevision.serialize,
- message.serialize))
- .futureValue shouldBe RescheduleResponse(isRescheduled = true)
+ testContainerId,
+ false,
+ alive = true))
+ .futureValue shouldBe
FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize)
+
+ expectNoMessage(200.millis)
+ }
+
+ it should "throw parsing error if fqn can't be parsed" in {
+ val notParsableFqn = "aaaaaaaaa"
+
+ val activationServiceImpl = ActivationServiceImpl()
+
+ QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc),
MemoryQueueValue(testActor, true))
+
+ a[ParsingException] should be thrownBy await {
+ activationServiceImpl
+ .fetchActivation(
+ FetchRequest(
+ TransactionId(TransactionId.generateTid()).serialize,
+ message.user.namespace.name.asString,
+ notParsableFqn,
+ testDocRevision.serialize,
+ testContainerId,
+ false,
+ alive = true))
+ }
+ }
+
+ it should "throw parsing error if rev can't be parsed" in {
+ val notParsableRev = "aaaaaaaaa"
+
+ val activationServiceImpl = ActivationServiceImpl()
+
+ QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc),
MemoryQueueValue(testActor, true))
- expectMsg(message)
+ a[ParsingException] should be thrownBy await {
+ activationServiceImpl
+ .fetchActivation(
+ FetchRequest(
+ TransactionId(TransactionId.generateTid()).serialize,
+ message.user.namespace.name.asString,
+ testFQN.serialize,
+ notParsableRev,
+ testContainerId,
+ false,
+ alive = true))
+ }
}
}