This is an automated email from the ASF dual-hosted git repository.

vvraskin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 5747536  Free up slot in Loadbalancer after log-collection is 
finished. (#4041)
5747536 is described below

commit 57475367b509fd2d4c14f5678d0c26642c52cc91
Author: Christian Bickel <[email protected]>
AuthorDate: Wed Oct 31 14:58:17 2018 +0100

    Free up slot in Loadbalancer after log-collection is finished. (#4041)
    
    Co-authored-by: Sugandha Agrawal <[email protected]>
---
 .../main/scala/whisk/core/connector/Message.scala  |  73 ++++++++++++--
 .../ShardingContainerPoolBalancer.scala            |  65 ++++++++----
 .../whisk/core/containerpool/ContainerProxy.scala  |  23 +++--
 .../scala/whisk/core/invoker/InvokerReactive.scala |  19 +++-
 .../tests/AcknowledgementMessageTests.scala        | 109 +++++++++++++++++++++
 .../connector/tests/CompletionMessageTests.scala   |  89 -----------------
 .../containerpool/test/ContainerProxyTests.scala   |   2 +-
 7 files changed, 250 insertions(+), 130 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala 
b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index ed677b1..ebfb59b 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -72,24 +72,51 @@ object ActivationMessage extends DefaultJsonProtocol {
 }
 
 /**
- * When adding fields, the serdes of the companion object must be updated also.
- * The whisk activation field will have its logs stripped.
+ * Message that is sent from the invoker to the controller after action is 
completed or after slot is free again for
+ * new actions.
+ */
+abstract class AcknowledegmentMessage() extends Message {
+  override val transid: TransactionId
+  override def serialize: String = {
+    AcknowledegmentMessage.serdes.write(this).compactPrint
+  }
+}
+
+/**
+ * This message is sent from the invoker to the controller, after the slot of 
an invoker that has been used by the
+ * current action, is free again (after log collection)
  */
 case class CompletionMessage(override val transid: TransactionId,
-                             response: Either[ActivationId, WhiskActivation],
+                             activationId: ActivationId,
+                             isSystemError: Boolean,
                              invoker: InvokerInstanceId)
-    extends Message {
+    extends AcknowledegmentMessage() {
 
-  override def serialize: String = {
-    CompletionMessage.serdes.write(this).compactPrint
+  override def toString = {
+    activationId.asString
   }
+}
+
+object CompletionMessage extends DefaultJsonProtocol {
+  def parse(msg: String): Try[CompletionMessage] = 
Try(serdes.read(msg.parseJson))
+  implicit val serdes = jsonFormat4(CompletionMessage.apply)
+}
+
+/**
+ * That message will be sent from the invoker to the controller after action 
completion if the user wants to have
+ * the result immediately (blocking activation).
+ * When adding fields, the serdes of the companion object must be updated also.
+ * The whisk activation field will have its logs stripped.
+ */
+case class ResultMessage(override val transid: TransactionId, response: 
Either[ActivationId, WhiskActivation])
+    extends AcknowledegmentMessage() {
 
   override def toString = {
     response.fold(l => l, r => r.activationId).asString
   }
 }
 
-object CompletionMessage extends DefaultJsonProtocol {
+object ResultMessage extends DefaultJsonProtocol {
   implicit def eitherResponse =
     new JsonFormat[Either[ActivationId, WhiskActivation]] {
       def write(either: Either[ActivationId, WhiskActivation]) = either match {
@@ -101,12 +128,38 @@ object CompletionMessage extends DefaultJsonProtocol {
         // per the ActivationId's serializer, it is guaranteed to be a String 
even if it only consists of digits
         case _: JsString => Left(value.convertTo[ActivationId])
         case _: JsObject => Right(value.convertTo[WhiskActivation])
-        case _           => deserializationError("could not read 
CompletionMessage")
+        case _           => deserializationError("could not read 
ResultMessage")
       }
     }
 
-  def parse(msg: String): Try[CompletionMessage] = 
Try(serdes.read(msg.parseJson))
-  private val serdes = jsonFormat3(CompletionMessage.apply)
+  def parse(msg: String): Try[ResultMessage] = Try(serdes.read(msg.parseJson))
+  implicit val serdes = jsonFormat2(ResultMessage.apply)
+}
+
+object AcknowledegmentMessage extends DefaultJsonProtocol {
+  def parse(msg: String): Try[AcknowledegmentMessage] = {
+    Try(serdes.read(msg.parseJson))
+  }
+
+  implicit val serdes = new RootJsonFormat[AcknowledegmentMessage] {
+    override def write(obj: AcknowledegmentMessage): JsValue = {
+      obj match {
+        case c: CompletionMessage => c.toJson
+        case r: ResultMessage     => r.toJson
+      }
+    }
+
+    override def read(json: JsValue): AcknowledegmentMessage = {
+      json.asJsObject
+      // The field invoker is only part of the CompletionMessage. If this 
field is part of the JSON, we try to convert
+      // it to a CompletionMessage. Otherwise to a ResultMessage.
+      // If both conversions fail, an error will be thrown that needs to be 
handled.
+        .getFields("invoker")
+        .headOption
+        .map(_ => json.convertTo[CompletionMessage])
+        .getOrElse(json.convertTo[ResultMessage])
+    }
+  }
 }
 
 case class PingMessage(instance: InvokerInstanceId) extends Message {
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 66f66a2..1769904 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -279,12 +279,12 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Con
 
     // Install a timeout handler for the catastrophic case where an active ack 
is not received at all
     // (because say an invoker is down completely, or the connection to the 
message bus is disrupted) or when
-    // the active ack is significantly delayed (possibly dues to long queues 
but the subject should not be penalized);
+    // the completion ack is significantly delayed (possibly dues to long 
queues but the subject should not be penalized);
     // in this case, if the activation handler is still registered, remove it 
and update the books.
     activations.getOrElseUpdate(
       msg.activationId, {
         val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
-          processCompletion(Left(msg.activationId), msg.transid, forced = 
true, invoker = instance)
+          processCompletion(msg.activationId, msg.transid, forced = true, 
isSystemError = false, invoker = instance)
         }
 
         // please note: timeoutHandler.cancel must be called on all 
non-timeout paths, e.g. Success
@@ -344,36 +344,61 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Con
       activeAckConsumer,
       maxActiveAcksPerPoll,
       activeAckPollDuration,
-      processActiveAck)
+      processAcknowledgement)
   })
 
-  /** 4. Get the active-ack message and parse it */
-  private def processActiveAck(bytes: Array[Byte]): Future[Unit] = Future {
+  /** 4. Get the acknowledgement message and parse it */
+  private def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = 
Future {
     val raw = new String(bytes, StandardCharsets.UTF_8)
-    CompletionMessage.parse(raw) match {
+    AcknowledegmentMessage.parse(raw) match {
       case Success(m: CompletionMessage) =>
-        processCompletion(m.response, m.transid, forced = false, invoker = 
m.invoker)
+        processCompletion(
+          m.activationId,
+          m.transid,
+          forced = false,
+          isSystemError = m.isSystemError,
+          invoker = m.invoker)
+        activationFeed ! MessageFeed.Processed
+
+      case Success(m: ResultMessage) =>
+        processResult(m.response, m.transid)
         activationFeed ! MessageFeed.Processed
 
       case Failure(t) =>
         activationFeed ! MessageFeed.Processed
-        logging.error(this, s"failed processing message: $raw with $t")
+        logging.error(this, s"failed processing message: $raw")
+
+      case _ =>
+        activationFeed ! MessageFeed.Processed
+        logging.error(this, s"Unexpected Acknowledgment message received by 
loadbalancer: $raw")
+    }
+  }
+
+  /** 5. Process the result ack and return it to the user */
+  private def processResult(response: Either[ActivationId, WhiskActivation], 
tid: TransactionId): Unit = {
+    val aid = response.fold(l => l, r => r.activationId)
+
+    // Resolve the promise to send the result back to the user
+    // The activation will be removed from `activations`-map later, when we 
receive the completion message, because the
+    // slot of the invoker is not yet free for new activations.
+    activations.get(aid).map { entry =>
+      entry.promise.trySuccess(response)
     }
+    logging.info(this, s"received result ack for '$aid'")(tid)
   }
 
-  /** 5. Process the active-ack and update the state accordingly */
-  private def processCompletion(response: Either[ActivationId, 
WhiskActivation],
+  /** Process the completion ack and update the state */
+  private def processCompletion(aid: ActivationId,
                                 tid: TransactionId,
                                 forced: Boolean,
+                                isSystemError: Boolean,
                                 invoker: InvokerInstanceId): Unit = {
-    val aid = response.fold(l => l, r => r.activationId)
 
     val invocationResult = if (forced) {
       InvocationFinishedResult.Timeout
     } else {
       // If the response contains a system error, report that, otherwise 
report Success
       // Left generally is considered a Success, since that could be a message 
not fitting into Kafka
-      val isSystemError = response.fold(_ => false, _.response.isWhiskError)
       if (isSystemError) {
         InvocationFinishedResult.SystemError
       } else {
@@ -390,12 +415,16 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Con
 
         if (!forced) {
           entry.timeoutHandler.cancel()
-          entry.promise.trySuccess(response)
+          // If the action was blocking and the Resultmessage has been 
received before nothing will happen here.
+          // If the action was blocking and the ResultMessage is still 
missing, we pass the ActivationId. With this Id,
+          // the controller will get the result out of the database.
+          // If the action was non-blocking, we will close the promise here.
+          entry.promise.trySuccess(Left(aid))
         } else {
-          entry.promise.tryFailure(new Throwable("no active ack received"))
+          entry.promise.tryFailure(new Throwable("no completion ack received"))
         }
 
-        logging.info(this, s"${if (!forced) "received" else "forced"} active 
ack for '$aid'")(tid)
+        logging.info(this, s"${if (!forced) "received" else "forced"} 
completion ack for '$aid'")(tid)
         // Active acks that are received here are strictly from user actions - 
health actions are not part of
         // the load balancer's activation map. Inform the invoker pool 
supervisor of the user action completion.
         invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
@@ -403,17 +432,17 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Con
         // Health actions do not have an ActivationEntry as they are written 
on the message bus directly. Their result
         // is important to pass to the invokerPool because they are used to 
determine if the invoker can be considered
         // healthy again.
-        logging.info(this, s"received active ack for health action on 
$invoker")(tid)
+        logging.info(this, s"received completion ack for health action on 
$invoker")(tid)
         invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
       case None if !forced =>
         // Received an active-ack that has already been taken out of the state 
because of a timeout (forced active-ack).
         // The result is ignored because a timeout has already been reported 
to the invokerPool per the force.
-        logging.debug(this, s"received active ack for '$aid' which has no 
entry")(tid)
+        logging.debug(this, s"received completion ack for '$aid' which has no 
entry")(tid)
       case None =>
         // The entry has already been removed by an active ack. This part of 
the code is reached by the timeout and can
         // happen if active-ack and timeout happen roughly at the same time 
(the timeout was triggered before the active
         // ack canceled the timer). As the active ack is already processed we 
don't have to do anything here.
-        logging.debug(this, s"forced active ack for '$aid' which has no 
entry")(tid)
+        logging.debug(this, s"forced completion ack for '$aid' which has no 
entry")(tid)
     }
   }
 
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala 
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index b34ce58..57e5c4f 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -96,7 +96,7 @@ case object RescheduleJob // job is sent back to parent and 
could not be process
  */
 class ContainerProxy(
   factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => 
Future[Container],
-  sendActiveAck: (TransactionId, WhiskActivation, Boolean, 
ControllerInstanceId, UUID) => Future[Any],
+  sendActiveAck: (TransactionId, WhiskActivation, Boolean, 
ControllerInstanceId, UUID, Boolean) => Future[Any],
   storeActivation: (TransactionId, WhiskActivation, UserContext) => 
Future[Any],
   collectLogs: (TransactionId, Identity, WhiskActivation, Container, 
ExecutableWhiskAction) => Future[ActivationLogs],
   instance: InvokerInstanceId,
@@ -168,7 +168,8 @@ class ContainerProxy(
               activation,
               job.msg.blocking,
               job.msg.rootControllerIndex,
-              job.msg.user.namespace.uuid)
+              job.msg.user.namespace.uuid,
+              true)
             storeActivation(transid, activation, context)
         }
         .flatMap { container =>
@@ -390,8 +391,10 @@ class ContainerProxy(
       }
 
     // Sending active ack. Entirely asynchronous and not waited upon.
-    activation.foreach(
-      sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, 
job.msg.user.namespace.uuid))
+    if (job.msg.blocking) {
+      activation.foreach(
+        sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, 
job.msg.user.namespace.uuid, false))
+    }
 
     val context = UserContext(job.msg.user)
 
@@ -418,8 +421,14 @@ class ContainerProxy(
         }
       }
 
-    // Storing the record. Entirely asynchronous and not waited upon.
-    activationWithLogs.map(_.fold(_.activation, 
identity)).foreach(storeActivation(tid, _, context))
+    activationWithLogs
+      .map(_.fold(_.activation, identity))
+      .foreach { activation =>
+        // Sending the completionMessage to the controller asynchronously.
+        sendActiveAck(tid, activation, job.msg.blocking, 
job.msg.rootControllerIndex, job.msg.user.namespace.uuid, true)
+        // Storing the record. Entirely asynchronous and not waited upon.
+        storeActivation(tid, activation, context)
+      }
 
     // Disambiguate activation errors and transform the Either into a 
failed/successful Future respectively.
     activationWithLogs.flatMap {
@@ -436,7 +445,7 @@ final case class ContainerProxyTimeoutConfig(idleContainer: 
FiniteDuration, paus
 object ContainerProxy {
   def props(
     factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => 
Future[Container],
-    ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) 
=> Future[Any],
+    ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, 
Boolean) => Future[Any],
     store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
     collectLogs: (TransactionId, Identity, WhiskActivation, Container, 
ExecutableWhiskAction) => Future[ActivationLogs],
     instance: InvokerInstanceId,
diff --git 
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 28b0289..65d6965 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -117,11 +117,19 @@ class InvokerReactive(
                      activationResult: WhiskActivation,
                      blockingInvoke: Boolean,
                      controllerInstance: ControllerInstanceId,
-                     userId: UUID) => {
+                     userId: UUID,
+                     isSlotFree: Boolean) => {
     implicit val transid: TransactionId = tid
 
     def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = 
false) = {
-      val msg = CompletionMessage(transid, res, instance)
+      val msg = if (isSlotFree) {
+        val aid = res.fold(identity, _.activationId)
+        val isWhiskSystemError = res.fold(_ => false, _.response.isWhiskError)
+        CompletionMessage(transid, aid, isWhiskSystemError, instance)
+      } else {
+        ResultMessage(transid, res)
+      }
+
       producer.send(topic = "completed" + controllerInstance.asString, 
msg).andThen {
         case Success(_) =>
           logging.info(
@@ -130,7 +138,8 @@ class InvokerReactive(
       }
     }
 
-    if (UserEvents.enabled) {
+    // UserMetrics are sent, when the slot is free again. This ensures, that 
all metrics are sent.
+    if (UserEvents.enabled && isSlotFree) {
       EventMessage.from(activationResult, s"invoker${instance.instance}", 
userId) match {
         case Success(msg) => UserEvents.send(producer, msg)
         case Failure(t)   => logging.error(this, s"activation event was not 
sent: $t")
@@ -223,7 +232,7 @@ class InvokerReactive(
                 val context = UserContext(msg.user)
                 val activation = generateFallbackActivation(msg, response)
                 activationFeed ! MessageFeed.Processed
-                ack(msg.transid, activation, msg.blocking, 
msg.rootControllerIndex, msg.user.namespace.uuid)
+                ack(msg.transid, activation, msg.blocking, 
msg.rootControllerIndex, msg.user.namespace.uuid, true)
                 store(msg.transid, activation, context)
                 Future.successful(())
             }
@@ -233,7 +242,7 @@ class InvokerReactive(
           activationFeed ! MessageFeed.Processed
           val activation =
             generateFallbackActivation(msg, 
ActivationResponse.applicationError(Messages.namespacesBlacklisted))
-          ack(msg.transid, activation, false, msg.rootControllerIndex, 
msg.user.namespace.uuid)
+          ack(msg.transid, activation, false, msg.rootControllerIndex, 
msg.user.namespace.uuid, true)
           logging.warn(this, s"namespace ${msg.user.namespace.name} was 
blocked in invoker.")
           Future.successful(())
         }
diff --git 
a/tests/src/test/scala/whisk/core/connector/tests/AcknowledgementMessageTests.scala
 
b/tests/src/test/scala/whisk/core/connector/tests/AcknowledgementMessageTests.scala
new file mode 100644
index 0000000..c82fc66
--- /dev/null
+++ 
b/tests/src/test/scala/whisk/core/connector/tests/AcknowledgementMessageTests.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.connector.tests
+
+import java.time.Instant
+
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+import spray.json._
+import whisk.common.TransactionId
+import whisk.core.connector.{AcknowledegmentMessage, CompletionMessage, 
ResultMessage}
+import whisk.core.entity._
+import whisk.core.entity.size.SizeInt
+
+import scala.concurrent.duration.DurationInt
+import scala.util.Success
+
+/**
+ * Unit tests for the AcknowledgementMessageTests object.
+ */
+@RunWith(classOf[JUnitRunner])
+class AcknowledgementMessageTests extends FlatSpec with Matchers {
+
+  behavior of "result message"
+
+  val defaultUserMemory: ByteSize = 1024.MB
+  val activation = WhiskActivation(
+    namespace = EntityPath("ns"),
+    name = EntityName("a"),
+    Subject(),
+    activationId = ActivationId.generate(),
+    start = Instant.now(),
+    end = Instant.now(),
+    response = ActivationResponse.success(Some(JsObject("res" -> 
JsNumber(1)))),
+    annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), 
MemoryLimit(128.MB), LogLimit(1.MB)).toJson),
+    duration = Some(123))
+
+  it should "serialize a left result message" in {
+    val m = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
+    m.serialize shouldBe JsObject("transid" -> m.transid.toJson, "response" -> 
m.response.left.get.toJson).compactPrint
+  }
+
+  it should "serialize a right result message" in {
+    val m =
+      ResultMessage(TransactionId.testing, Right(activation))
+    m.serialize shouldBe JsObject("transid" -> m.transid.toJson, "response" -> 
m.response.right.get.toJson).compactPrint
+  }
+
+  it should "deserialize a left result message" in {
+    val m = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
+    ResultMessage.parse(m.serialize) shouldBe Success(m)
+  }
+
+  it should "deserialize a right result message" in {
+    val m =
+      ResultMessage(TransactionId.testing, Right(activation))
+    ResultMessage.parse(m.serialize) shouldBe Success(m)
+  }
+
+  behavior of "acknowledgement message"
+
+  it should "serialize a Completion message" in {
+    val c = CompletionMessage(
+      TransactionId.testing,
+      ActivationId.generate(),
+      false,
+      InvokerInstanceId(0, userMemory = defaultUserMemory))
+    val m: AcknowledegmentMessage = c
+    m.serialize shouldBe c.toJson.compactPrint
+  }
+
+  it should "serialize a Result message" in {
+    val r = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
+    val m: AcknowledegmentMessage = r
+    m.serialize shouldBe r.toJson.compactPrint
+  }
+
+  it should "deserialize a Completion message" in {
+    val c = CompletionMessage(
+      TransactionId.testing,
+      ActivationId.generate(),
+      false,
+      InvokerInstanceId(0, userMemory = defaultUserMemory))
+    val m: AcknowledegmentMessage = c
+    AcknowledegmentMessage.parse(m.serialize) shouldBe Success(c)
+  }
+
+  it should "deserialize a Result message" in {
+    val r = ResultMessage(TransactionId.testing, Left(ActivationId.generate()))
+    val m: AcknowledegmentMessage = r
+    AcknowledegmentMessage.parse(m.serialize) shouldBe Success(r)
+  }
+}
diff --git 
a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala 
b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
deleted file mode 100644
index ed1ac37..0000000
--- 
a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.connector.tests
-
-import java.time.Instant
-
-import scala.util.Success
-import scala.concurrent.duration.DurationInt
-
-import org.junit.runner.RunWith
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-import org.scalatest.junit.JUnitRunner
-
-import spray.json._
-import whisk.common.TransactionId
-import whisk.core.connector.CompletionMessage
-import whisk.core.entity._
-import whisk.core.entity.size.SizeInt
-
-/**
- * Unit tests for the CompletionMessage object.
- */
-@RunWith(classOf[JUnitRunner])
-class CompletionMessageTests extends FlatSpec with Matchers {
-
-  behavior of "completion message"
-
-  val defaultUserMemory: ByteSize = 1024.MB
-  val activation = WhiskActivation(
-    namespace = EntityPath("ns"),
-    name = EntityName("a"),
-    Subject(),
-    activationId = ActivationId.generate(),
-    start = Instant.now(),
-    end = Instant.now(),
-    response = ActivationResponse.success(Some(JsObject("res" -> 
JsNumber(1)))),
-    annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), 
MemoryLimit(128.MB), LogLimit(1.MB)).toJson),
-    duration = Some(123))
-
-  it should "serialize a left completion message" in {
-    val m = CompletionMessage(
-      TransactionId.testing,
-      Left(ActivationId.generate()),
-      InvokerInstanceId(0, userMemory = defaultUserMemory))
-    m.serialize shouldBe JsObject(
-      "transid" -> m.transid.toJson,
-      "response" -> m.response.left.get.toJson,
-      "invoker" -> m.invoker.toJson).compactPrint
-  }
-
-  it should "serialize a right completion message" in {
-    val m =
-      CompletionMessage(TransactionId.testing, Right(activation), 
InvokerInstanceId(0, userMemory = defaultUserMemory))
-    m.serialize shouldBe JsObject(
-      "transid" -> m.transid.toJson,
-      "response" -> m.response.right.get.toJson,
-      "invoker" -> m.invoker.toJson).compactPrint
-  }
-
-  it should "deserialize a left completion message" in {
-    val m = CompletionMessage(
-      TransactionId.testing,
-      Left(ActivationId.generate()),
-      InvokerInstanceId(0, userMemory = defaultUserMemory))
-    CompletionMessage.parse(m.serialize) shouldBe Success(m)
-  }
-
-  it should "deserialize a right completion message" in {
-    val m =
-      CompletionMessage(TransactionId.testing, Right(activation), 
InvokerInstanceId(0, userMemory = defaultUserMemory))
-    CompletionMessage.parse(m.serialize) shouldBe Success(m)
-  }
-}
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 920d535..24ef264 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -147,7 +147,7 @@ class ContainerProxyTests
 
   /** Creates an inspectable version of the ack method, which records all 
calls in a buffer */
   def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction {
-    (_: TransactionId, activation: WhiskActivation, _: Boolean, _: 
ControllerInstanceId, _: UUID) =>
+    (_: TransactionId, activation: WhiskActivation, _: Boolean, _: 
ControllerInstanceId, _: UUID, _: Boolean) =>
       activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
       activation.annotations.get("path") shouldBe 
Some(a.fullyQualifiedName(false).toString.toJson)
       activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)

Reply via email to