This is an automated email from the ASF dual-hosted git repository.
chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new fac309c Combines active ack and slot release when both are available.
(#4624)
fac309c is described below
commit fac309c78a99c227978172279794ac34641cc4dc
Author: rodric rabbah <[email protected]>
AuthorDate: Sat Sep 21 01:10:45 2019 -0700
Combines active ack and slot release when both are available. (#4624)
Combine active ack and slot release when both are available. This commit
changes the types of AcknowledegmentMessage exchanged on `completedxxx` topics
to 3
- CombinedCompletionAndResultMessage - Sent when the resource slot and the
action result are available at the same time
- ResultMessage - Sent once an action result is available for blocking
actions
- CompletionMessage - Sent once the resource slot in the invoker is free
again
This would ensure that the controller can quickly cleanup resources for
comleted invocation when they result in error
(instead of performing slow db polling)
---
.../apache/openwhisk/core/connector/Message.scala | 214 +++++++++++++++------
.../org/apache/openwhisk/core/entity/Exec.scala | 2 +-
.../core/loadBalancer/CommonLoadBalancer.scala | 29 +--
.../core/containerpool/ContainerProxy.scala | 27 ++-
.../openwhisk/core/invoker/InvokerReactive.scala | 114 ++++++-----
.../tests/AcknowledgementMessageTests.scala | 117 ++++++-----
.../containerpool/test/ContainerProxyTests.scala | 70 +++++--
.../test/ShardingContainerPoolBalancerTests.scala | 4 +-
8 files changed, 393 insertions(+), 184 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index 13fd435..9a1a586 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -64,101 +64,189 @@ case class ActivationMessage(override val transid:
TransactionId,
def causedBySequence: Boolean = cause.isDefined
}
-object ActivationMessage extends DefaultJsonProtocol {
-
- def parse(msg: String) = Try(serdes.read(msg.parseJson))
-
- private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
- implicit val serdes = jsonFormat11(ActivationMessage.apply)
-}
-
/**
* Message that is sent from the invoker to the controller after action is
completed or after slot is free again for
* new actions.
*/
abstract class AcknowledegmentMessage(private val tid: TransactionId) extends
Message {
override val transid: TransactionId = tid
- override def serialize: String = {
- AcknowledegmentMessage.serdes.write(this).compactPrint
- }
+ override def serialize: String =
AcknowledegmentMessage.serdes.write(this).compactPrint
+
+ /** Pithy descriptor for logging. */
+ def messageType: String
+
+ /** Does message indicate slot is free? */
+ def isSlotFree: Option[InvokerInstanceId]
+
+ /** Does message contain a result? */
+ def result: Option[Either[ActivationId, WhiskActivation]]
+
+ /**
+ * Is the acknowledgement for an activation that failed internally?
+ * For some message, this is not relevant and the result is None.
+ */
+ def isSystemError: Option[Boolean]
+
+ def activationId: ActivationId
+
+ /** Serializes the message to JSON. */
+ def toJson: JsValue
+
+ /**
+ * Converts the message to a more compact form if it cannot cross the
message bus as is or some of its details are not necessary.
+ */
+ def shrink: AcknowledegmentMessage
}
/**
- * This message is sent from the invoker to the controller, after the slot of
an invoker that has been used by the
- * current action, is free again (after log collection)
+ * This message is sent from an invoker to the controller in situations when
the resource slot and the action
+ * result are available at the same time, and so the split-phase notification
is not necessary. Instead the message
+ * combines the `CompletionMessage` and `ResultMessage`. The `response` may be
an `ActivationId` to allow for failures
+ * to send the activation result because of event-bus size limitations.
+ *
+ * The constructor is private so that callers must use the more restrictive
constructors which ensure the respose is always
+ * Right when this message is created.
*/
-case class CompletionMessage(override val transid: TransactionId,
- activationId: ActivationId,
- isSystemError: Boolean,
- invoker: InvokerInstanceId)
+case class CombinedCompletionAndResultMessage private (override val transid:
TransactionId,
+ response:
Either[ActivationId, WhiskActivation],
+ override val
isSystemError: Option[Boolean],
+ invoker:
InvokerInstanceId)
extends AcknowledegmentMessage(transid) {
-
- override def toString = {
- activationId.asString
- }
+ override def messageType = "combined"
+ override def result = Some(response)
+ override def isSlotFree = Some(invoker)
+ override def activationId = response.fold(identity, _.activationId)
+ override def toJson = CombinedCompletionAndResultMessage.serdes.write(this)
+ override def shrink = copy(response = response.flatMap(a =>
Left(a.activationId)))
+ override def toString = activationId.asString
}
-object CompletionMessage extends DefaultJsonProtocol {
- def parse(msg: String): Try[CompletionMessage] =
Try(serdes.read(msg.parseJson))
- implicit val serdes = jsonFormat4(CompletionMessage.apply)
+/**
+ * This message is sent from an invoker to the controller, once the resource
slot in the invoker (used by the
+ * corresponding activation) free again (i.e., after log collection). The
`CompletionMessage` is part of a split
+ * phase notification to the load balancer where an invoker first sends a
`ResultMessage` and later sends the
+ * `CompletionMessage`.
+ */
+case class CompletionMessage private (override val transid: TransactionId,
+ override val activationId: ActivationId,
+ override val isSystemError:
Option[Boolean],
+ invoker: InvokerInstanceId)
+ extends AcknowledegmentMessage(transid) {
+ override def messageType = "completion"
+ override def result = None
+ override def isSlotFree = Some(invoker)
+ override def toJson = CompletionMessage.serdes.write(this)
+ override def shrink = this
+ override def toString = activationId.asString
}
/**
- * That message will be sent from the invoker to the controller after action
completion if the user wants to have
- * the result immediately (blocking activation).
- * When adding fields, the serdes of the companion object must be updated also.
- * The whisk activation field will have its logs stripped.
+ * This message is sent from an invoker to the load balancer once an action
result is available for blocking actions.
+ * This is part of a split phase notification, and does not indicate that the
slot is available, which is indicated with
+ * a `CompletionMessage`. Note that activation record will not contain any
logs from the action execution, only the result.
+ *
+ * The constructor is private so that callers must use the more restrictive
constructors which ensure the respose is always
+ * Right when this message is created.
*/
-case class ResultMessage(override val transid: TransactionId, response:
Either[ActivationId, WhiskActivation])
+case class ResultMessage private (override val transid: TransactionId,
response: Either[ActivationId, WhiskActivation])
extends AcknowledegmentMessage(transid) {
+ override def messageType = "result"
+ override def result = Some(response)
+ override def isSlotFree = None
+ override def isSystemError = response.fold(_ => None, a =>
Some(a.response.isWhiskError))
+ override def activationId = response.fold(identity, _.activationId)
+ override def toJson = ResultMessage.serdes.write(this)
+ override def shrink = copy(response = response.flatMap(a =>
Left(a.activationId)))
+ override def toString = activationId.asString
+}
- override def toString = {
- response.fold(l => l, r => r.activationId).asString
+object ActivationMessage extends DefaultJsonProtocol {
+ def parse(msg: String) = Try(serdes.read(msg.parseJson))
+
+ private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
+ implicit val serdes = jsonFormat11(ActivationMessage.apply)
+}
+
+object CombinedCompletionAndResultMessage extends DefaultJsonProtocol {
+ // this constructor is restricted to ensure the message is always created
with certain invariants
+ private def apply(transid: TransactionId,
+ activation: Either[ActivationId, WhiskActivation],
+ isSystemError: Option[Boolean],
+ invoker: InvokerInstanceId):
CombinedCompletionAndResultMessage =
+ new CombinedCompletionAndResultMessage(transid, activation, isSystemError,
invoker)
+
+ def apply(transid: TransactionId,
+ activation: WhiskActivation,
+ invoker: InvokerInstanceId): CombinedCompletionAndResultMessage =
+ new CombinedCompletionAndResultMessage(transid, Right(activation),
Some(activation.response.isWhiskError), invoker)
+
+ implicit private val eitherSerdes = AcknowledegmentMessage.eitherResponse
+ implicit val serdes = jsonFormat4(
+ CombinedCompletionAndResultMessage
+ .apply(_: TransactionId, _: Either[ActivationId, WhiskActivation], _:
Option[Boolean], _: InvokerInstanceId))
+}
+
+object CompletionMessage extends DefaultJsonProtocol {
+ // this constructor is restricted to ensure the message is always created
with certain invariants
+ private def apply(transid: TransactionId,
+ activation: WhiskActivation,
+ isSystemError: Option[Boolean],
+ invoker: InvokerInstanceId): CompletionMessage =
+ new CompletionMessage(transid, activation.activationId,
Some(activation.response.isWhiskError), invoker)
+
+ def apply(transid: TransactionId, activation: WhiskActivation, invoker:
InvokerInstanceId): CompletionMessage = {
+ new CompletionMessage(transid, activation.activationId,
Some(activation.response.isWhiskError), invoker)
}
+
+ implicit val serdes = jsonFormat4(
+ CompletionMessage.apply(_: TransactionId, _: ActivationId, _:
Option[Boolean], _: InvokerInstanceId))
}
object ResultMessage extends DefaultJsonProtocol {
- implicit def eitherResponse =
- new JsonFormat[Either[ActivationId, WhiskActivation]] {
- def write(either: Either[ActivationId, WhiskActivation]) = either match {
- case Right(a) => a.toJson
- case Left(b) => b.toJson
- }
+ // this constructor is restricted to ensure the message is always created
with certain invariants
+ private def apply(transid: TransactionId, response: Either[ActivationId,
WhiskActivation]): ResultMessage =
+ new ResultMessage(transid, response)
- def read(value: JsValue) = value match {
- // per the ActivationId's serializer, it is guaranteed to be a String
even if it only consists of digits
- case _: JsString => Left(value.convertTo[ActivationId])
- case _: JsObject => Right(value.convertTo[WhiskActivation])
- case _ => deserializationError("could not read
ResultMessage")
- }
- }
+ def apply(transid: TransactionId, activation: WhiskActivation):
ResultMessage =
+ new ResultMessage(transid, Right(activation))
- def parse(msg: String): Try[ResultMessage] = Try(serdes.read(msg.parseJson))
- implicit val serdes = jsonFormat2(ResultMessage.apply)
+ implicit private val eitherSerdes = AcknowledegmentMessage.eitherResponse
+ implicit val serdes = jsonFormat2(ResultMessage.apply(_: TransactionId, _:
Either[ActivationId, WhiskActivation]))
}
object AcknowledegmentMessage extends DefaultJsonProtocol {
- def parse(msg: String): Try[AcknowledegmentMessage] = {
- Try(serdes.read(msg.parseJson))
+ def parse(msg: String): Try[AcknowledegmentMessage] =
Try(serdes.read(msg.parseJson))
+
+ protected[connector] val eitherResponse = new
JsonFormat[Either[ActivationId, WhiskActivation]] {
+ def write(either: Either[ActivationId, WhiskActivation]) =
either.fold(_.toJson, _.toJson)
+
+ def read(value: JsValue) = value match {
+ case _: JsString =>
+ // per the ActivationId serializer, an activation id is a String even
if it only consists of digits
+ Left(value.convertTo[ActivationId])
+
+ case _: JsObject => Right(value.convertTo[WhiskActivation])
+ case _ => deserializationError("could not read ResultMessage")
+ }
}
implicit val serdes = new RootJsonFormat[AcknowledegmentMessage] {
- override def write(obj: AcknowledegmentMessage): JsValue = {
- obj match {
- case c: CompletionMessage => c.toJson
- case r: ResultMessage => r.toJson
- }
- }
+ override def write(m: AcknowledegmentMessage): JsValue = m.toJson
+ // The field invoker is only part of CombinedCompletionAndResultMessage
and CompletionMessage.
+ // If this field is part of the JSON, we try to deserialize into one of
these two types,
+ // and otherwise to a ResultMessage. If all conversions fail, an error
will be thrown that needs to be handled.
override def read(json: JsValue): AcknowledegmentMessage = {
- json.asJsObject
- // The field invoker is only part of the CompletionMessage. If this
field is part of the JSON, we try to convert
- // it to a CompletionMessage. Otherwise to a ResultMessage.
- // If both conversions fail, an error will be thrown that needs to be
handled.
- .getFields("invoker")
- .headOption
- .map(_ => json.convertTo[CompletionMessage])
- .getOrElse(json.convertTo[ResultMessage])
+ val JsObject(fields) = json
+ val completion = fields.contains("invoker")
+ val result = fields.contains("response")
+ if (completion && result) {
+ json.convertTo[CombinedCompletionAndResultMessage]
+ } else if (completion) {
+ json.convertTo[CompletionMessage]
+ } else {
+ json.convertTo[ResultMessage]
+ }
}
}
}
@@ -178,7 +266,7 @@ trait EventMessageBody extends Message {
object EventMessageBody extends DefaultJsonProtocol {
- implicit def format = new JsonFormat[EventMessageBody] {
+ implicit val format = new JsonFormat[EventMessageBody] {
def write(eventMessageBody: EventMessageBody) = eventMessageBody match {
case m: Metric => m.toJson
case a: Activation => a.toJson
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Exec.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Exec.scala
index e9be6cf..6830399 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Exec.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/Exec.scala
@@ -242,7 +242,7 @@ object Exec extends ArgNormalizer[Exec] with
DefaultJsonProtocol {
protected[core] val SEQUENCE = "sequence"
protected[core] val BLACKBOX = "blackbox"
- // This is for error cases while cannot get the `kind` of Exec
+ // This is for error cases where the action `kind` may not be known.
protected[core] val UNKNOWN = "unknown"
private def execManifests = ExecManifest.runtimesManifest
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
index 51bf4a8..2f6652e 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
@@ -204,17 +204,20 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
protected[loadBalancer] def processAcknowledgement(bytes: Array[Byte]):
Future[Unit] = Future {
val raw = new String(bytes, StandardCharsets.UTF_8)
AcknowledegmentMessage.parse(raw) match {
- case Success(m: CompletionMessage) =>
- processCompletion(
- m.activationId,
- m.transid,
- forced = false,
- isSystemError = m.isSystemError,
- invoker = m.invoker)
- activationFeed ! MessageFeed.Processed
+ case Success(acknowledegment) =>
+ acknowledegment.isSlotFree.foreach { invoker =>
+ processCompletion(
+ acknowledegment.activationId,
+ acknowledegment.transid,
+ forced = false,
+ isSystemError = acknowledegment.isSystemError.getOrElse(false),
+ invoker)
+ }
+
+ acknowledegment.result.foreach { response =>
+ processResult(acknowledegment.activationId, acknowledegment.transid,
response)
+ }
- case Success(m: ResultMessage) =>
- processResult(m.response, m.transid)
activationFeed ! MessageFeed.Processed
case Failure(t) =>
@@ -228,9 +231,9 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
}
/** 5. Process the result ack and return it to the user */
- protected def processResult(response: Either[ActivationId, WhiskActivation],
tid: TransactionId): Unit = {
- val aid = response.fold(l => l, r => r.activationId)
-
+ protected def processResult(aid: ActivationId,
+ tid: TransactionId,
+ response: Either[ActivationId,
WhiskActivation]): Unit = {
// Resolve the promise to send the result back to the user.
// The activation will be removed from the activation slots later, when
the completion message
// is received (because the slot in the invoker is not yet free for new
activations).
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 47b2532..513dae8 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -18,17 +18,24 @@
package org.apache.openwhisk.core.containerpool
import java.time.Instant
+
import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{FSM, Props, Stash}
import akka.event.Logging.InfoLevel
import akka.pattern.pipe
import pureconfig.loadConfigOrThrow
+
import scala.collection.immutable
import spray.json.DefaultJsonProtocol._
import spray.json._
import org.apache.openwhisk.common.{AkkaLogging, Counter, LoggingMarkers,
TransactionId}
import org.apache.openwhisk.core.ConfigKeys
-import org.apache.openwhisk.core.connector.ActivationMessage
+import org.apache.openwhisk.core.connector.{
+ ActivationMessage,
+ CombinedCompletionAndResultMessage,
+ CompletionMessage,
+ ResultMessage
+}
import org.apache.openwhisk.core.containerpool.logging.LogCollectingException
import org.apache.openwhisk.core.database.UserContext
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
@@ -36,6 +43,7 @@ import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.invoker.InvokerReactive.ActiveAck
import org.apache.openwhisk.http.Messages
+
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success}
@@ -300,7 +308,7 @@ class ContainerProxy(
job.msg.blocking,
job.msg.rootControllerIndex,
job.msg.user.namespace.uuid,
- true)
+ CombinedCompletionAndResultMessage(transid, activation,
instance))
storeActivation(transid, activation, context)
}
.flatMap { container =>
@@ -628,8 +636,15 @@ class ContainerProxy(
// completion message which frees a load balancer slot is sent after the
active ack future
// completes to ensure proper ordering.
val sendResult = if (job.msg.blocking) {
- activation.map(
- sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex,
job.msg.user.namespace.uuid, false))
+ activation.map { result =>
+ sendActiveAck(
+ tid,
+ result,
+ job.msg.blocking,
+ job.msg.rootControllerIndex,
+ job.msg.user.namespace.uuid,
+ ResultMessage(tid, result))
+ }
} else {
// For non-blocking request, do not forward the result.
Future.successful(())
@@ -673,7 +688,7 @@ class ContainerProxy(
job.msg.blocking,
job.msg.rootControllerIndex,
job.msg.user.namespace.uuid,
- true))
+ CompletionMessage(tid, activation, instance)))
// Storing the record. Entirely asynchronous and not waited upon.
storeActivation(tid, activation, context)
}
@@ -699,7 +714,7 @@ object ContainerProxy {
ByteSize,
Int,
Option[ExecutableWhiskAction]) => Future[Container],
- ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID,
Boolean) => Future[Any],
+ ack: ActiveAck,
store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container,
ExecutableWhiskAction) => Future[ActivationLogs],
instance: InvokerInstanceId,
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index 14ccce3..86a44cb 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -27,7 +27,7 @@ import akka.stream.ActorMaterializer
import org.apache.kafka.common.errors.RecordTooLargeException
import org.apache.openwhisk.common._
import org.apache.openwhisk.common.tracing.WhiskTracerProvider
-import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, _}
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.database.{UserContext, _}
@@ -49,14 +49,21 @@ object InvokerReactive extends InvokerProvider {
* are either completion messages for an activation to indicate a resource
slot is free, or result-forwarding
* messages for continuations (e.g., sequences and conductor actions).
*
- * @param TransactionId the transaction id for the activation
- * @param WhiskActivaiton is the activation result
- * @param Boolean is true iff the activation was a blocking request
- * @param ControllerInstanceId the originating controller/loadbalancer id
- * @param UUID is the UUID for the namespace owning the activation
- * @param Boolean is true this is resource free message and false if this is
a result forwarding message
+ * @param tid the transaction id for the activation
+ * @param activaiton is the activation result
+ * @param blockingInvoke is true iff the activation was a blocking request
+ * @param controllerInstance the originating controller/loadbalancer id
+ * @param userId is the UUID for the namespace owning the activation
+ * @param acknowledegment the acknowledgement message to send
*/
- type ActiveAck = (TransactionId, WhiskActivation, Boolean,
ControllerInstanceId, UUID, Boolean) => Future[Any]
+ trait ActiveAck {
+ def apply(tid: TransactionId,
+ activation: WhiskActivation, // the activation property is used
primarily for testing
+ blockingInvoke: Boolean,
+ controllerInstance: ControllerInstanceId,
+ userId: UUID,
+ acknowledegment: AcknowledegmentMessage): Future[Any]
+ }
override def instance(
config: WhiskConfig,
@@ -145,44 +152,46 @@ class InvokerReactive(
new MessageFeed("activation", logging, consumer, maxPeek, 1.second,
processActivationMessage)
})
- /** Sends an active-ack. */
- private val ack: InvokerReactive.ActiveAck = (tid: TransactionId,
- activationResult:
WhiskActivation,
- blockingInvoke: Boolean,
- controllerInstance:
ControllerInstanceId,
- userId: UUID,
- isSlotFree: Boolean) => {
- implicit val transid: TransactionId = tid
-
- def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean =
false) = {
- val msg = if (isSlotFree) {
- val aid = res.fold(identity, _.activationId)
- val isWhiskSystemError = res.fold(_ => false, _.response.isWhiskError)
- CompletionMessage(transid, aid, isWhiskSystemError, instance)
- } else {
- ResultMessage(transid, res)
+ private val ack = new InvokerReactive.ActiveAck {
+ override def apply(tid: TransactionId,
+ activation: WhiskActivation,
+ blockingInvoke: Boolean,
+ controllerInstance: ControllerInstanceId,
+ userId: UUID,
+ acknowledegment: AcknowledegmentMessage): Future[Any] =
{
+ implicit val transid: TransactionId = tid
+
+ def send(msg: AcknowledegmentMessage, recovery: Boolean = false) = {
+ producer.send(topic = "completed" + controllerInstance.asString,
msg).andThen {
+ case Success(_) =>
+ val info = if (recovery) s"recovery ${msg.messageType}" else
msg.messageType
+ logging.info(this, s"posted $info of activation
${acknowledegment.activationId}")
+ }
}
- producer.send(topic = "completed" + controllerInstance.asString,
msg).andThen {
- case Success(_) =>
- logging.info(
- this,
- s"posted ${if (recovery) "recovery" else "completion"} of
activation ${activationResult.activationId}")
+ // UserMetrics are sent, when the slot is free again. This ensures, that
all metrics are sent.
+ if (UserEvents.enabled && acknowledegment.isSlotFree.nonEmpty) {
+ acknowledegment.result match {
+ case Some(Right(activationResult: WhiskActivation)) =>
+ EventMessage.from(activationResult,
s"invoker${instance.instance}", userId) match {
+ case Success(msg) => UserEvents.send(producer, msg)
+ case Failure(t) => logging.error(this, s"activation event was
not sent: $t")
+ }
+ case _ =>
+ // all acknowledegment messages should have a result
+ logging.error(this, s"activation event was not sent because the
result is missing")
+ }
}
- }
- // UserMetrics are sent, when the slot is free again. This ensures, that
all metrics are sent.
- if (UserEvents.enabled && isSlotFree) {
- EventMessage.from(activationResult, s"invoker${instance.instance}",
userId) match {
- case Success(msg) => UserEvents.send(producer, msg)
- case Failure(t) => logging.error(this, s"activation event was not
sent: $t")
+ // An acknowledgement containing the result is only needed for blocking
invokes in order to further the
+ // continuation. A result message for a non-blocking activation is not
actually registered in the load balancer
+ // and the container proxy should not send such an acknowlegement unless
it's a blocking request. Here the code
+ // is defensive and will shrink all non-blocking acknowledegments.
+ send(if (blockingInvoke) acknowledegment else
acknowledegment.shrink).recoverWith {
+ case t if t.getCause.isInstanceOf[RecordTooLargeException] =>
+ send(acknowledegment.shrink, recovery = true)
}
}
-
- send(Right(if (blockingInvoke) activationResult else
activationResult.withoutLogsOrResult)).recoverWith {
- case t if t.getCause.isInstanceOf[RecordTooLargeException] =>
- send(Left(activationResult.activationId), recovery = true)
- }
}
/** Stores an activation in the database. */
@@ -262,20 +271,35 @@ class InvokerReactive(
ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking)
}
- val context = UserContext(msg.user)
- val activation = generateFallbackActivation(msg, response)
activationFeed ! MessageFeed.Processed
- ack(msg.transid, activation, msg.blocking,
msg.rootControllerIndex, msg.user.namespace.uuid, true)
- store(msg.transid, activation, context)
+
+ val activation = generateFallbackActivation(msg, response)
+ ack(
+ msg.transid,
+ activation,
+ msg.blocking,
+ msg.rootControllerIndex,
+ msg.user.namespace.uuid,
+ CombinedCompletionAndResultMessage(transid, activation,
instance))
+
+ store(msg.transid, activation, UserContext(msg.user))
Future.successful(())
}
} else {
// Iff the current namespace is blacklisted, an active-ack is only
produced to keep the loadbalancer protocol
// Due to the protective nature of the blacklist, a database entry
is not written.
activationFeed ! MessageFeed.Processed
+
val activation =
generateFallbackActivation(msg,
ActivationResponse.applicationError(Messages.namespacesBlacklisted))
- ack(msg.transid, activation, false, msg.rootControllerIndex,
msg.user.namespace.uuid, true)
+ ack(
+ msg.transid,
+ activation,
+ false,
+ msg.rootControllerIndex,
+ msg.user.namespace.uuid,
+ CombinedCompletionAndResultMessage(transid, activation, instance))
+
logging.warn(this, s"namespace ${msg.user.namespace.name} was
blocked in invoker.")
Future.successful(())
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/connector/tests/AcknowledgementMessageTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/connector/tests/AcknowledgementMessageTests.scala
index 602cc60..5216bb8 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/connector/tests/AcknowledgementMessageTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/connector/tests/AcknowledgementMessageTests.scala
@@ -22,7 +22,12 @@ import org.scalatest.{FlatSpec, Matchers}
import org.scalatest.junit.JUnitRunner
import spray.json._
import org.apache.openwhisk.common.{TransactionId, WhiskInstants}
-import org.apache.openwhisk.core.connector.{AcknowledegmentMessage,
CompletionMessage, ResultMessage}
+import org.apache.openwhisk.core.connector.{
+ AcknowledegmentMessage,
+ CombinedCompletionAndResultMessage,
+ CompletionMessage,
+ ResultMessage
+}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size.SizeInt
@@ -35,7 +40,7 @@ import scala.util.Success
@RunWith(classOf[JUnitRunner])
class AcknowledgementMessageTests extends FlatSpec with Matchers with
WhiskInstants {
- behavior of "result message"
+ behavior of "acknowledgement message"
val defaultUserMemory: ByteSize = 1024.MB
val activation = WhiskActivation(
@@ -49,59 +54,85 @@ class AcknowledgementMessageTests extends FlatSpec with
Matchers with WhiskInsta
annotations = Parameters("limits", ActionLimits(TimeLimit(1.second),
MemoryLimit(128.MB), LogLimit(1.MB)).toJson),
duration = Some(123))
- it should "serialize a left result message" in {
- val m = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
+ it should "serialize and deserialize a Result message with Left result" in {
+ val m = ResultMessage(TransactionId.testing, activation).shrink
+ m.response shouldBe 'left
+ m.isSlotFree shouldBe empty
m.serialize shouldBe JsObject("transid" -> m.transid.toJson, "response" ->
m.response.left.get.toJson).compactPrint
+ m.serialize shouldBe m.toJson.compactPrint
+ AcknowledegmentMessage.parse(m.serialize) shouldBe Success(m)
}
- it should "serialize a right result message" in {
- val m =
- ResultMessage(TransactionId.testing, Right(activation))
+ it should "serialize and deserialize a Result message with Right result" in {
+ val m = ResultMessage(TransactionId.testing, activation)
+ m.response shouldBe 'right
+ m.isSlotFree shouldBe empty
m.serialize shouldBe JsObject("transid" -> m.transid.toJson, "response" ->
m.response.right.get.toJson).compactPrint
+ AcknowledegmentMessage.parse(m.serialize) shouldBe Success(m)
}
- it should "deserialize a left result message" in {
- val m = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
- ResultMessage.parse(m.serialize) shouldBe Success(m)
- }
-
- it should "deserialize a right result message" in {
- val m =
- ResultMessage(TransactionId.testing, Right(activation))
- ResultMessage.parse(m.serialize) shouldBe Success(m)
- }
-
- behavior of "acknowledgement message"
-
- it should "serialize a Completion message" in {
- val c = CompletionMessage(
+ it should "serialize and deserialize a Completion message" in {
+ val m = CompletionMessage(
TransactionId.testing,
ActivationId.generate(),
- false,
+ Some(false),
InvokerInstanceId(0, userMemory = defaultUserMemory))
- val m: AcknowledegmentMessage = c
- m.serialize shouldBe c.toJson.compactPrint
+ m.isSlotFree should not be empty
+ m.serialize shouldBe m.toJson.compactPrint
+ AcknowledegmentMessage.parse(m.serialize) shouldBe Success(m)
}
- it should "serialize a Result message" in {
- val r = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
- val m: AcknowledegmentMessage = r
- m.serialize shouldBe r.toJson.compactPrint
- }
+ it should "serialize and deserialize a CombinedCompletionAndResultMessage"
in {
+ withClue("system error false and right") {
+ val c = CombinedCompletionAndResultMessage(
+ TransactionId.testing,
+ activation,
+ InvokerInstanceId(0, userMemory = defaultUserMemory))
+ c.response shouldBe 'right
+ c.isSlotFree should not be empty
+ c.isSystemError shouldBe Some(false)
+ c.serialize shouldBe c.toJson.compactPrint
+ AcknowledegmentMessage.parse(c.serialize) shouldBe Success(c)
+ }
- it should "deserialize a Completion message" in {
- val c = CompletionMessage(
- TransactionId.testing,
- ActivationId.generate(),
- false,
- InvokerInstanceId(0, userMemory = defaultUserMemory))
- val m: AcknowledegmentMessage = c
- AcknowledegmentMessage.parse(m.serialize) shouldBe Success(c)
- }
+ withClue("system error true and right") {
+ val response = ActivationResponse.whiskError(JsString("error"))
+ val someActivation = activation.copy(response = response)
+ val c = CombinedCompletionAndResultMessage(
+ TransactionId.testing,
+ someActivation,
+ InvokerInstanceId(0, userMemory = defaultUserMemory))
+ c.response shouldBe 'right
+ c.isSlotFree should not be empty
+ c.isSystemError shouldBe Some(true)
+ c.serialize shouldBe c.toJson.compactPrint
+ AcknowledegmentMessage.parse(c.serialize) shouldBe Success(c)
+ }
+
+ withClue("system error false and left") {
+ val c = CombinedCompletionAndResultMessage(
+ TransactionId.testing,
+ activation,
+ InvokerInstanceId(0, userMemory = defaultUserMemory)).shrink
+ c.response shouldBe 'left
+ c.isSlotFree should not be empty
+ c.isSystemError shouldBe Some(false)
+ c.serialize shouldBe c.toJson.compactPrint
+ AcknowledegmentMessage.parse(c.serialize) shouldBe Success(c)
+ }
- it should "deserialize a Result message" in {
- val r = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
- val m: AcknowledegmentMessage = r
- AcknowledegmentMessage.parse(m.serialize) shouldBe Success(r)
+ withClue("system error true and left") {
+ val response = ActivationResponse.whiskError(JsString("error"))
+ val someActivation = activation.copy(response = response)
+ val c = CombinedCompletionAndResultMessage(
+ TransactionId.testing,
+ someActivation,
+ InvokerInstanceId(0, userMemory = defaultUserMemory)).shrink
+ c.response shouldBe 'left
+ c.isSlotFree should not be empty
+ c.isSystemError shouldBe Some(true)
+ c.serialize shouldBe c.toJson.compactPrint
+ AcknowledegmentMessage.parse(c.serialize) shouldBe Success(c)
+ }
}
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index e4d041d..af5df73 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -18,6 +18,7 @@
package org.apache.openwhisk.core.containerpool.test
import java.time.Instant
+
import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
import akka.actor.{ActorRef, ActorSystem, FSM}
import akka.stream.scaladsl.Source
@@ -26,13 +27,14 @@ import akka.util.ByteString
import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction,
WhiskProperties}
import java.time.temporal.ChronoUnit
import java.util.concurrent.atomic.AtomicInteger
+
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import spray.json.DefaultJsonProtocol._
import spray.json._
import org.apache.openwhisk.common.{Logging, TransactionId}
-import org.apache.openwhisk.core.connector.ActivationMessage
+import org.apache.openwhisk.core.connector.{AcknowledegmentMessage,
ActivationMessage}
import org.apache.openwhisk.core.containerpool.WarmingData
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.containerpool.logging.LogCollectingException
@@ -41,6 +43,9 @@ import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.http.Messages
import org.apache.openwhisk.core.database.UserContext
+import org.apache.openwhisk.core.invoker.InvokerReactive
+
+import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
@@ -159,22 +164,65 @@ class ContainerProxyTests
expectMsg(Transition(machine, Pausing, Paused))
}
- /** Creates an inspectable version of the ack method, which records all
calls in a buffer */
- def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction {
- (_: TransactionId, activation: WhiskActivation, _: Boolean, _:
ControllerInstanceId, _: UUID, _: Boolean) =>
+ trait LoggedAcker extends InvokerReactive.ActiveAck {
+ def calls =
+ mutable.Buffer[(TransactionId, WhiskActivation, Boolean,
ControllerInstanceId, UUID, AcknowledegmentMessage)]()
+
+ def verifyAnnotations(activation: WhiskActivation, a:
ExecutableWhiskAction) = {
activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
activation.annotations.get("path") shouldBe
Some(a.fullyQualifiedName(false).toString.toJson)
activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)
- Future.successful(())
+ }
+ }
+
+ /** Creates an inspectable version of the ack method, which records all
calls in a buffer */
+ def createAcker(a: ExecutableWhiskAction = action) = new LoggedAcker {
+ val acker = LoggedFunction {
+ (_: TransactionId,
+ activation: WhiskActivation,
+ _: Boolean,
+ _: ControllerInstanceId,
+ _: UUID,
+ _: AcknowledegmentMessage) =>
+ Future.successful(())
+ }
+
+ override def calls = acker.calls
+
+ override def apply(tid: TransactionId,
+ activation: WhiskActivation,
+ blockingInvoke: Boolean,
+ controllerInstance: ControllerInstanceId,
+ userId: UUID,
+ acknowledegment: AcknowledegmentMessage): Future[Any] =
{
+ verifyAnnotations(activation, a)
+ acker(tid, activation, blockingInvoke, controllerInstance, userId,
acknowledegment)
+ }
}
/** Creates an synchronized inspectable version of the ack method, which
records all calls in a buffer */
- def createSyncAcker(a: ExecutableWhiskAction = action) =
SynchronizedLoggedFunction {
- (_: TransactionId, activation: WhiskActivation, _: Boolean, _:
ControllerInstanceId, _: UUID, _: Boolean) =>
- activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
- activation.annotations.get("path") shouldBe
Some(a.fullyQualifiedName(false).toString.toJson)
- activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)
- Future.successful(())
+ def createSyncAcker(a: ExecutableWhiskAction = action) = new LoggedAcker {
+ val acker = SynchronizedLoggedFunction {
+ (_: TransactionId,
+ activation: WhiskActivation,
+ _: Boolean,
+ _: ControllerInstanceId,
+ _: UUID,
+ _: AcknowledegmentMessage) =>
+ Future.successful(())
+ }
+
+ override def calls = acker.calls
+
+ override def apply(tid: TransactionId,
+ activation: WhiskActivation,
+ blockingInvoke: Boolean,
+ controllerInstance: ControllerInstanceId,
+ userId: UUID,
+ acknowledegment: AcknowledegmentMessage): Future[Any] =
{
+ verifyAnnotations(activation, a)
+ acker(tid, activation, blockingInvoke, controllerInstance, userId,
acknowledegment)
+ }
}
/** Creates an inspectable factory */
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 7646b49..df1cc3b 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -565,8 +565,8 @@ class ShardingContainerPoolBalancerTests
def completeActivation(invoker: InvokerInstanceId, balancer:
ShardingContainerPoolBalancer, aid: ActivationId) = {
//complete activation
- val ack = CompletionMessage(TransactionId.testing, aid, false,
invoker).serialize
- .getBytes(StandardCharsets.UTF_8)
+ val ack =
+ CompletionMessage(TransactionId.testing, aid, Some(false),
invoker).serialize.getBytes(StandardCharsets.UTF_8)
balancer.processAcknowledgement(ack)
}
}