This is an automated email from the ASF dual-hosted git repository.
vvraskin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 5747536 Free up slot in Loadbalancer after log-collection is
finished. (#4041)
5747536 is described below
commit 57475367b509fd2d4c14f5678d0c26642c52cc91
Author: Christian Bickel <[email protected]>
AuthorDate: Wed Oct 31 14:58:17 2018 +0100
Free up slot in Loadbalancer after log-collection is finished. (#4041)
Co-authored-by: Sugandha Agrawal <[email protected]>
---
.../main/scala/whisk/core/connector/Message.scala | 73 ++++++++++++--
.../ShardingContainerPoolBalancer.scala | 65 ++++++++----
.../whisk/core/containerpool/ContainerProxy.scala | 23 +++--
.../scala/whisk/core/invoker/InvokerReactive.scala | 19 +++-
.../tests/AcknowledgementMessageTests.scala | 109 +++++++++++++++++++++
.../connector/tests/CompletionMessageTests.scala | 89 -----------------
.../containerpool/test/ContainerProxyTests.scala | 2 +-
7 files changed, 250 insertions(+), 130 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala
b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index ed677b1..ebfb59b 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -72,24 +72,51 @@ object ActivationMessage extends DefaultJsonProtocol {
}
/**
- * When adding fields, the serdes of the companion object must be updated also.
- * The whisk activation field will have its logs stripped.
+ * Message that is sent from the invoker to the controller after action is
completed or after slot is free again for
+ * new actions.
+ */
+abstract class AcknowledegmentMessage() extends Message {
+ override val transid: TransactionId
+ override def serialize: String = {
+ AcknowledegmentMessage.serdes.write(this).compactPrint
+ }
+}
+
+/**
+ * This message is sent from the invoker to the controller, after the slot of
an invoker that has been used by the
+ * current action, is free again (after log collection)
*/
case class CompletionMessage(override val transid: TransactionId,
- response: Either[ActivationId, WhiskActivation],
+ activationId: ActivationId,
+ isSystemError: Boolean,
invoker: InvokerInstanceId)
- extends Message {
+ extends AcknowledegmentMessage() {
- override def serialize: String = {
- CompletionMessage.serdes.write(this).compactPrint
+ override def toString = {
+ activationId.asString
}
+}
+
+object CompletionMessage extends DefaultJsonProtocol {
+ def parse(msg: String): Try[CompletionMessage] =
Try(serdes.read(msg.parseJson))
+ implicit val serdes = jsonFormat4(CompletionMessage.apply)
+}
+
+/**
+ * That message will be sent from the invoker to the controller after action
completion if the user wants to have
+ * the result immediately (blocking activation).
+ * When adding fields, the serdes of the companion object must be updated also.
+ * The whisk activation field will have its logs stripped.
+ */
+case class ResultMessage(override val transid: TransactionId, response:
Either[ActivationId, WhiskActivation])
+ extends AcknowledegmentMessage() {
override def toString = {
response.fold(l => l, r => r.activationId).asString
}
}
-object CompletionMessage extends DefaultJsonProtocol {
+object ResultMessage extends DefaultJsonProtocol {
implicit def eitherResponse =
new JsonFormat[Either[ActivationId, WhiskActivation]] {
def write(either: Either[ActivationId, WhiskActivation]) = either match {
@@ -101,12 +128,38 @@ object CompletionMessage extends DefaultJsonProtocol {
// per the ActivationId's serializer, it is guaranteed to be a String
even if it only consists of digits
case _: JsString => Left(value.convertTo[ActivationId])
case _: JsObject => Right(value.convertTo[WhiskActivation])
- case _ => deserializationError("could not read
CompletionMessage")
+ case _ => deserializationError("could not read
ResultMessage")
}
}
- def parse(msg: String): Try[CompletionMessage] =
Try(serdes.read(msg.parseJson))
- private val serdes = jsonFormat3(CompletionMessage.apply)
+ def parse(msg: String): Try[ResultMessage] = Try(serdes.read(msg.parseJson))
+ implicit val serdes = jsonFormat2(ResultMessage.apply)
+}
+
+object AcknowledegmentMessage extends DefaultJsonProtocol {
+ def parse(msg: String): Try[AcknowledegmentMessage] = {
+ Try(serdes.read(msg.parseJson))
+ }
+
+ implicit val serdes = new RootJsonFormat[AcknowledegmentMessage] {
+ override def write(obj: AcknowledegmentMessage): JsValue = {
+ obj match {
+ case c: CompletionMessage => c.toJson
+ case r: ResultMessage => r.toJson
+ }
+ }
+
+ override def read(json: JsValue): AcknowledegmentMessage = {
+ json.asJsObject
+ // The field invoker is only part of the CompletionMessage. If this
field is part of the JSON, we try to convert
+ // it to a CompletionMessage. Otherwise to a ResultMessage.
+ // If both conversions fail, an error will be thrown that needs to be
handled.
+ .getFields("invoker")
+ .headOption
+ .map(_ => json.convertTo[CompletionMessage])
+ .getOrElse(json.convertTo[ResultMessage])
+ }
+ }
}
case class PingMessage(instance: InvokerInstanceId) extends Message {
diff --git
a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 66f66a2..1769904 100644
---
a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++
b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -279,12 +279,12 @@ class ShardingContainerPoolBalancer(config: WhiskConfig,
controllerInstance: Con
// Install a timeout handler for the catastrophic case where an active ack
is not received at all
// (because say an invoker is down completely, or the connection to the
message bus is disrupted) or when
- // the active ack is significantly delayed (possibly dues to long queues
but the subject should not be penalized);
+ // the completion ack is significantly delayed (possibly dues to long
queues but the subject should not be penalized);
// in this case, if the activation handler is still registered, remove it
and update the books.
activations.getOrElseUpdate(
msg.activationId, {
val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
- processCompletion(Left(msg.activationId), msg.transid, forced =
true, invoker = instance)
+ processCompletion(msg.activationId, msg.transid, forced = true,
isSystemError = false, invoker = instance)
}
// please note: timeoutHandler.cancel must be called on all
non-timeout paths, e.g. Success
@@ -344,36 +344,61 @@ class ShardingContainerPoolBalancer(config: WhiskConfig,
controllerInstance: Con
activeAckConsumer,
maxActiveAcksPerPoll,
activeAckPollDuration,
- processActiveAck)
+ processAcknowledgement)
})
- /** 4. Get the active-ack message and parse it */
- private def processActiveAck(bytes: Array[Byte]): Future[Unit] = Future {
+ /** 4. Get the acknowledgement message and parse it */
+ private def processAcknowledgement(bytes: Array[Byte]): Future[Unit] =
Future {
val raw = new String(bytes, StandardCharsets.UTF_8)
- CompletionMessage.parse(raw) match {
+ AcknowledegmentMessage.parse(raw) match {
case Success(m: CompletionMessage) =>
- processCompletion(m.response, m.transid, forced = false, invoker =
m.invoker)
+ processCompletion(
+ m.activationId,
+ m.transid,
+ forced = false,
+ isSystemError = m.isSystemError,
+ invoker = m.invoker)
+ activationFeed ! MessageFeed.Processed
+
+ case Success(m: ResultMessage) =>
+ processResult(m.response, m.transid)
activationFeed ! MessageFeed.Processed
case Failure(t) =>
activationFeed ! MessageFeed.Processed
- logging.error(this, s"failed processing message: $raw with $t")
+ logging.error(this, s"failed processing message: $raw")
+
+ case _ =>
+ activationFeed ! MessageFeed.Processed
+ logging.error(this, s"Unexpected Acknowledgment message received by
loadbalancer: $raw")
+ }
+ }
+
+ /** 5. Process the result ack and return it to the user */
+ private def processResult(response: Either[ActivationId, WhiskActivation],
tid: TransactionId): Unit = {
+ val aid = response.fold(l => l, r => r.activationId)
+
+ // Resolve the promise to send the result back to the user
+ // The activation will be removed from `activations`-map later, when we
receive the completion message, because the
+ // slot of the invoker is not yet free for new activations.
+ activations.get(aid).map { entry =>
+ entry.promise.trySuccess(response)
}
+ logging.info(this, s"received result ack for '$aid'")(tid)
}
- /** 5. Process the active-ack and update the state accordingly */
- private def processCompletion(response: Either[ActivationId,
WhiskActivation],
+ /** Process the completion ack and update the state */
+ private def processCompletion(aid: ActivationId,
tid: TransactionId,
forced: Boolean,
+ isSystemError: Boolean,
invoker: InvokerInstanceId): Unit = {
- val aid = response.fold(l => l, r => r.activationId)
val invocationResult = if (forced) {
InvocationFinishedResult.Timeout
} else {
// If the response contains a system error, report that, otherwise
report Success
// Left generally is considered a Success, since that could be a message
not fitting into Kafka
- val isSystemError = response.fold(_ => false, _.response.isWhiskError)
if (isSystemError) {
InvocationFinishedResult.SystemError
} else {
@@ -390,12 +415,16 @@ class ShardingContainerPoolBalancer(config: WhiskConfig,
controllerInstance: Con
if (!forced) {
entry.timeoutHandler.cancel()
- entry.promise.trySuccess(response)
+ // If the action was blocking and the Resultmessage has been
received before nothing will happen here.
+ // If the action was blocking and the ResultMessage is still
missing, we pass the ActivationId. With this Id,
+ // the controller will get the result out of the database.
+ // If the action was non-blocking, we will close the promise here.
+ entry.promise.trySuccess(Left(aid))
} else {
- entry.promise.tryFailure(new Throwable("no active ack received"))
+ entry.promise.tryFailure(new Throwable("no completion ack received"))
}
- logging.info(this, s"${if (!forced) "received" else "forced"} active
ack for '$aid'")(tid)
+ logging.info(this, s"${if (!forced) "received" else "forced"}
completion ack for '$aid'")(tid)
// Active acks that are received here are strictly from user actions -
health actions are not part of
// the load balancer's activation map. Inform the invoker pool
supervisor of the user action completion.
invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
@@ -403,17 +432,17 @@ class ShardingContainerPoolBalancer(config: WhiskConfig,
controllerInstance: Con
// Health actions do not have an ActivationEntry as they are written
on the message bus directly. Their result
// is important to pass to the invokerPool because they are used to
determine if the invoker can be considered
// healthy again.
- logging.info(this, s"received active ack for health action on
$invoker")(tid)
+ logging.info(this, s"received completion ack for health action on
$invoker")(tid)
invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
case None if !forced =>
// Received an active-ack that has already been taken out of the state
because of a timeout (forced active-ack).
// The result is ignored because a timeout has already been reported
to the invokerPool per the force.
- logging.debug(this, s"received active ack for '$aid' which has no
entry")(tid)
+ logging.debug(this, s"received completion ack for '$aid' which has no
entry")(tid)
case None =>
// The entry has already been removed by an active ack. This part of
the code is reached by the timeout and can
// happen if active-ack and timeout happen roughly at the same time
(the timeout was triggered before the active
// ack canceled the timer). As the active ack is already processed we
don't have to do anything here.
- logging.debug(this, s"forced active ack for '$aid' which has no
entry")(tid)
+ logging.debug(this, s"forced completion ack for '$aid' which has no
entry")(tid)
}
}
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index b34ce58..57e5c4f 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -96,7 +96,7 @@ case object RescheduleJob // job is sent back to parent and
could not be process
*/
class ContainerProxy(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) =>
Future[Container],
- sendActiveAck: (TransactionId, WhiskActivation, Boolean,
ControllerInstanceId, UUID) => Future[Any],
+ sendActiveAck: (TransactionId, WhiskActivation, Boolean,
ControllerInstanceId, UUID, Boolean) => Future[Any],
storeActivation: (TransactionId, WhiskActivation, UserContext) =>
Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container,
ExecutableWhiskAction) => Future[ActivationLogs],
instance: InvokerInstanceId,
@@ -168,7 +168,8 @@ class ContainerProxy(
activation,
job.msg.blocking,
job.msg.rootControllerIndex,
- job.msg.user.namespace.uuid)
+ job.msg.user.namespace.uuid,
+ true)
storeActivation(transid, activation, context)
}
.flatMap { container =>
@@ -390,8 +391,10 @@ class ContainerProxy(
}
// Sending active ack. Entirely asynchronous and not waited upon.
- activation.foreach(
- sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex,
job.msg.user.namespace.uuid))
+ if (job.msg.blocking) {
+ activation.foreach(
+ sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex,
job.msg.user.namespace.uuid, false))
+ }
val context = UserContext(job.msg.user)
@@ -418,8 +421,14 @@ class ContainerProxy(
}
}
- // Storing the record. Entirely asynchronous and not waited upon.
- activationWithLogs.map(_.fold(_.activation,
identity)).foreach(storeActivation(tid, _, context))
+ activationWithLogs
+ .map(_.fold(_.activation, identity))
+ .foreach { activation =>
+ // Sending the completionMessage to the controller asynchronously.
+ sendActiveAck(tid, activation, job.msg.blocking,
job.msg.rootControllerIndex, job.msg.user.namespace.uuid, true)
+ // Storing the record. Entirely asynchronous and not waited upon.
+ storeActivation(tid, activation, context)
+ }
// Disambiguate activation errors and transform the Either into a
failed/successful Future respectively.
activationWithLogs.flatMap {
@@ -436,7 +445,7 @@ final case class ContainerProxyTimeoutConfig(idleContainer:
FiniteDuration, paus
object ContainerProxy {
def props(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) =>
Future[Container],
- ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID)
=> Future[Any],
+ ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID,
Boolean) => Future[Any],
store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container,
ExecutableWhiskAction) => Future[ActivationLogs],
instance: InvokerInstanceId,
diff --git
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 28b0289..65d6965 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -117,11 +117,19 @@ class InvokerReactive(
activationResult: WhiskActivation,
blockingInvoke: Boolean,
controllerInstance: ControllerInstanceId,
- userId: UUID) => {
+ userId: UUID,
+ isSlotFree: Boolean) => {
implicit val transid: TransactionId = tid
def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean =
false) = {
- val msg = CompletionMessage(transid, res, instance)
+ val msg = if (isSlotFree) {
+ val aid = res.fold(identity, _.activationId)
+ val isWhiskSystemError = res.fold(_ => false, _.response.isWhiskError)
+ CompletionMessage(transid, aid, isWhiskSystemError, instance)
+ } else {
+ ResultMessage(transid, res)
+ }
+
producer.send(topic = "completed" + controllerInstance.asString,
msg).andThen {
case Success(_) =>
logging.info(
@@ -130,7 +138,8 @@ class InvokerReactive(
}
}
- if (UserEvents.enabled) {
+ // UserMetrics are sent, when the slot is free again. This ensures, that
all metrics are sent.
+ if (UserEvents.enabled && isSlotFree) {
EventMessage.from(activationResult, s"invoker${instance.instance}",
userId) match {
case Success(msg) => UserEvents.send(producer, msg)
case Failure(t) => logging.error(this, s"activation event was not
sent: $t")
@@ -223,7 +232,7 @@ class InvokerReactive(
val context = UserContext(msg.user)
val activation = generateFallbackActivation(msg, response)
activationFeed ! MessageFeed.Processed
- ack(msg.transid, activation, msg.blocking,
msg.rootControllerIndex, msg.user.namespace.uuid)
+ ack(msg.transid, activation, msg.blocking,
msg.rootControllerIndex, msg.user.namespace.uuid, true)
store(msg.transid, activation, context)
Future.successful(())
}
@@ -233,7 +242,7 @@ class InvokerReactive(
activationFeed ! MessageFeed.Processed
val activation =
generateFallbackActivation(msg,
ActivationResponse.applicationError(Messages.namespacesBlacklisted))
- ack(msg.transid, activation, false, msg.rootControllerIndex,
msg.user.namespace.uuid)
+ ack(msg.transid, activation, false, msg.rootControllerIndex,
msg.user.namespace.uuid, true)
logging.warn(this, s"namespace ${msg.user.namespace.name} was
blocked in invoker.")
Future.successful(())
}
diff --git
a/tests/src/test/scala/whisk/core/connector/tests/AcknowledgementMessageTests.scala
b/tests/src/test/scala/whisk/core/connector/tests/AcknowledgementMessageTests.scala
new file mode 100644
index 0000000..c82fc66
--- /dev/null
+++
b/tests/src/test/scala/whisk/core/connector/tests/AcknowledgementMessageTests.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.connector.tests
+
+import java.time.Instant
+
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+import spray.json._
+import whisk.common.TransactionId
+import whisk.core.connector.{AcknowledegmentMessage, CompletionMessage,
ResultMessage}
+import whisk.core.entity._
+import whisk.core.entity.size.SizeInt
+
+import scala.concurrent.duration.DurationInt
+import scala.util.Success
+
+/**
+ * Unit tests for the AcknowledgementMessageTests object.
+ */
+@RunWith(classOf[JUnitRunner])
+class AcknowledgementMessageTests extends FlatSpec with Matchers {
+
+ behavior of "result message"
+
+ val defaultUserMemory: ByteSize = 1024.MB
+ val activation = WhiskActivation(
+ namespace = EntityPath("ns"),
+ name = EntityName("a"),
+ Subject(),
+ activationId = ActivationId.generate(),
+ start = Instant.now(),
+ end = Instant.now(),
+ response = ActivationResponse.success(Some(JsObject("res" ->
JsNumber(1)))),
+ annotations = Parameters("limits", ActionLimits(TimeLimit(1.second),
MemoryLimit(128.MB), LogLimit(1.MB)).toJson),
+ duration = Some(123))
+
+ it should "serialize a left result message" in {
+ val m = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
+ m.serialize shouldBe JsObject("transid" -> m.transid.toJson, "response" ->
m.response.left.get.toJson).compactPrint
+ }
+
+ it should "serialize a right result message" in {
+ val m =
+ ResultMessage(TransactionId.testing, Right(activation))
+ m.serialize shouldBe JsObject("transid" -> m.transid.toJson, "response" ->
m.response.right.get.toJson).compactPrint
+ }
+
+ it should "deserialize a left result message" in {
+ val m = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
+ ResultMessage.parse(m.serialize) shouldBe Success(m)
+ }
+
+ it should "deserialize a right result message" in {
+ val m =
+ ResultMessage(TransactionId.testing, Right(activation))
+ ResultMessage.parse(m.serialize) shouldBe Success(m)
+ }
+
+ behavior of "acknowledgement message"
+
+ it should "serialize a Completion message" in {
+ val c = CompletionMessage(
+ TransactionId.testing,
+ ActivationId.generate(),
+ false,
+ InvokerInstanceId(0, userMemory = defaultUserMemory))
+ val m: AcknowledegmentMessage = c
+ m.serialize shouldBe c.toJson.compactPrint
+ }
+
+ it should "serialize a Result message" in {
+ val r = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
+ val m: AcknowledegmentMessage = r
+ m.serialize shouldBe r.toJson.compactPrint
+ }
+
+ it should "deserialize a Completion message" in {
+ val c = CompletionMessage(
+ TransactionId.testing,
+ ActivationId.generate(),
+ false,
+ InvokerInstanceId(0, userMemory = defaultUserMemory))
+ val m: AcknowledegmentMessage = c
+ AcknowledegmentMessage.parse(m.serialize) shouldBe Success(c)
+ }
+
+ it should "deserialize a Result message" in {
+ val r = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
+ val m: AcknowledegmentMessage = r
+ AcknowledegmentMessage.parse(m.serialize) shouldBe Success(r)
+ }
+}
diff --git
a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
deleted file mode 100644
index ed1ac37..0000000
---
a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.connector.tests
-
-import java.time.Instant
-
-import scala.util.Success
-import scala.concurrent.duration.DurationInt
-
-import org.junit.runner.RunWith
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-import org.scalatest.junit.JUnitRunner
-
-import spray.json._
-import whisk.common.TransactionId
-import whisk.core.connector.CompletionMessage
-import whisk.core.entity._
-import whisk.core.entity.size.SizeInt
-
-/**
- * Unit tests for the CompletionMessage object.
- */
-@RunWith(classOf[JUnitRunner])
-class CompletionMessageTests extends FlatSpec with Matchers {
-
- behavior of "completion message"
-
- val defaultUserMemory: ByteSize = 1024.MB
- val activation = WhiskActivation(
- namespace = EntityPath("ns"),
- name = EntityName("a"),
- Subject(),
- activationId = ActivationId.generate(),
- start = Instant.now(),
- end = Instant.now(),
- response = ActivationResponse.success(Some(JsObject("res" ->
JsNumber(1)))),
- annotations = Parameters("limits", ActionLimits(TimeLimit(1.second),
MemoryLimit(128.MB), LogLimit(1.MB)).toJson),
- duration = Some(123))
-
- it should "serialize a left completion message" in {
- val m = CompletionMessage(
- TransactionId.testing,
- Left(ActivationId.generate()),
- InvokerInstanceId(0, userMemory = defaultUserMemory))
- m.serialize shouldBe JsObject(
- "transid" -> m.transid.toJson,
- "response" -> m.response.left.get.toJson,
- "invoker" -> m.invoker.toJson).compactPrint
- }
-
- it should "serialize a right completion message" in {
- val m =
- CompletionMessage(TransactionId.testing, Right(activation),
InvokerInstanceId(0, userMemory = defaultUserMemory))
- m.serialize shouldBe JsObject(
- "transid" -> m.transid.toJson,
- "response" -> m.response.right.get.toJson,
- "invoker" -> m.invoker.toJson).compactPrint
- }
-
- it should "deserialize a left completion message" in {
- val m = CompletionMessage(
- TransactionId.testing,
- Left(ActivationId.generate()),
- InvokerInstanceId(0, userMemory = defaultUserMemory))
- CompletionMessage.parse(m.serialize) shouldBe Success(m)
- }
-
- it should "deserialize a right completion message" in {
- val m =
- CompletionMessage(TransactionId.testing, Right(activation),
InvokerInstanceId(0, userMemory = defaultUserMemory))
- CompletionMessage.parse(m.serialize) shouldBe Success(m)
- }
-}
diff --git
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 920d535..24ef264 100644
---
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -147,7 +147,7 @@ class ContainerProxyTests
/** Creates an inspectable version of the ack method, which records all
calls in a buffer */
def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction {
- (_: TransactionId, activation: WhiskActivation, _: Boolean, _:
ControllerInstanceId, _: UUID) =>
+ (_: TransactionId, activation: WhiskActivation, _: Boolean, _:
ControllerInstanceId, _: UUID, _: Boolean) =>
activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
activation.annotations.get("path") shouldBe
Some(a.fullyQualifiedName(false).toString.toJson)
activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)