sven-lange-last commented on a change in pull request #4624: Combines active 
ack and slot release when both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r326048416
 
 

 ##########
 File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
 ##########
 @@ -64,100 +64,165 @@ case class ActivationMessage(override val transid: 
TransactionId,
   def causedBySequence: Boolean = cause.isDefined
 }
 
-object ActivationMessage extends DefaultJsonProtocol {
-
-  def parse(msg: String) = Try(serdes.read(msg.parseJson))
-
-  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat11(ActivationMessage.apply)
-}
-
 /**
  * Message that is sent from the invoker to the controller after action is 
completed or after slot is free again for
  * new actions.
  */
 abstract class AcknowledegmentMessage(private val tid: TransactionId) extends 
Message {
   override val transid: TransactionId = tid
-  override def serialize: String = {
-    AcknowledegmentMessage.serdes.write(this).compactPrint
-  }
+  override def serialize: String = 
AcknowledegmentMessage.serdes.write(this).compactPrint
+
+  /** Pithy descriptor for logging. */
+  def name: String
+
+  /** Does message indicate slot is free? */
+  def isSlotFree: Option[InvokerInstanceId]
+
+  /** Does message contain a result? */
+  def result: Option[Either[ActivationId, WhiskActivation]]
+
+  /**
+   * Is the acknowledgement for an activation that failed internally?
+   * For some message, this is not relevant and the result is None.
+   */
+  def isSystemError: Option[Boolean]
+
+  def activationId: ActivationId
+
+  /** Serializes the message to JSON. */
+  def toJson: JsValue
+
+  /**
+   * Converts the message to a more compact form if it cannot cross the 
message bus as is or some of its details are not necessary.
+   */
+  def shrink: AcknowledegmentMessage
 }
 
 /**
- * This message is sent from the invoker to the controller, after the slot of 
an invoker that has been used by the
- * current action, is free again (after log collection)
+ * This message is sent from an invoker to the controller in situtations when 
the resource slot and the action
+ * result are available at the same time, and so the split-phase notification 
is not necessary. Instead the message
+ * combines the `CompletionMessage` and `ResultMessage`. The `response` may be 
an `ActivationId` to allow for failures
+ * to send the activation result because of event-bus size limitations.
  */
-case class CompletionMessage(override val transid: TransactionId,
-                             activationId: ActivationId,
-                             isSystemError: Boolean,
-                             invoker: InvokerInstanceId)
+case class CombinedCompletionAndResultMessage(override val transid: 
TransactionId,
+                                              response: Either[ActivationId, 
WhiskActivation],
+                                              override val isSystemError: 
Option[Boolean],
+                                              invoker: InvokerInstanceId)
     extends AcknowledegmentMessage(transid) {
-
-  override def toString = {
-    activationId.asString
-  }
+  override def name = "combined"
+  override def result = Some(response)
+  override def isSlotFree = Some(invoker)
+  override def activationId = response.fold(identity, _.activationId)
+  override def toJson = CombinedCompletionAndResultMessage.serdes.write(this)
+  override def shrink = copy(response = response.flatMap(a => 
Left(a.activationId)))
+  override def toString = response.fold(identity, _.activationId).asString
 }
 
-object CompletionMessage extends DefaultJsonProtocol {
-  def parse(msg: String): Try[CompletionMessage] = 
Try(serdes.read(msg.parseJson))
-  implicit val serdes = jsonFormat4(CompletionMessage.apply)
+/**
+ * This message is sent from an invoker to the controller, once the resource 
slot in the invoker (used by the
+ * corresponding activation) free again (i.e., after log collection). The 
`CompletionMessage` is part of a split
+ * phase notification to the load balancer where an invoker first sends a 
`ResultMessage` and later sends the
+ * `CompletionMessage`.
+ */
+case class CompletionMessage(override val transid: TransactionId,
+                             override val activationId: ActivationId,
+                             override val isSystemError: Option[Boolean],
+                             invoker: InvokerInstanceId)
+    extends AcknowledegmentMessage(transid) {
+  override def name = "completion"
+  override def result = None
+  override def isSlotFree = Some(invoker)
+  override def toJson = CompletionMessage.serdes.write(this)
+  override def shrink = this
+  override def toString = activationId.asString
 }
 
 /**
- * That message will be sent from the invoker to the controller after action 
completion if the user wants to have
- * the result immediately (blocking activation).
- * When adding fields, the serdes of the companion object must be updated also.
- * The whisk activation field will have its logs stripped.
+ * This message is sent from an invoker to the load balancer once an action 
result is available for blocking actions.
+ * This is part of a split phase notification, and does not indicate that the 
slot is available, which is indicated with
+ * a `CompletionMessage`. Note that activation record will not contain any 
logs from the action execution, only the result.
  */
 case class ResultMessage(override val transid: TransactionId, response: 
Either[ActivationId, WhiskActivation])
     extends AcknowledegmentMessage(transid) {
+  override def name = "result"
+  override def result = Some(response)
+  override def isSlotFree = None
+  override def isSystemError = response.fold(_ => None, a => 
Some(a.response.isWhiskError))
+  override def activationId = response.fold(identity, _.activationId)
+  override def toJson = ResultMessage.serdes.write(this)
+  override def shrink = copy(response = response.flatMap(a => 
Left(a.activationId)))
+  override def toString = activationId.asString
+}
 
-  override def toString = {
-    response.fold(l => l, r => r.activationId).asString
-  }
+object ActivationMessage extends DefaultJsonProtocol {
+  def parse(msg: String) = Try(serdes.read(msg.parseJson))
+
+  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
+  implicit val serdes = jsonFormat11(ActivationMessage.apply)
 }
 
-object ResultMessage extends DefaultJsonProtocol {
-  implicit def eitherResponse =
-    new JsonFormat[Either[ActivationId, WhiskActivation]] {
-      def write(either: Either[ActivationId, WhiskActivation]) = either match {
-        case Right(a) => a.toJson
-        case Left(b)  => b.toJson
-      }
+object CombinedCompletionAndResultMessage extends DefaultJsonProtocol {
+  def apply(transid: TransactionId,
+            activation: WhiskActivation,
+            invoker: InvokerInstanceId): CombinedCompletionAndResultMessage = {
+    CombinedCompletionAndResultMessage(transid, Right(activation), 
Some(activation.response.isWhiskError), invoker)
+  }
+  implicit private val eitherSerdes = AcknowledegmentMessage.eitherResponse
+  implicit val serdes = jsonFormat4(
+    CombinedCompletionAndResultMessage
+      .apply(_: TransactionId, _: Either[ActivationId, WhiskActivation], _: 
Option[Boolean], _: InvokerInstanceId))
+}
 
-      def read(value: JsValue) = value match {
-        // per the ActivationId's serializer, it is guaranteed to be a String 
even if it only consists of digits
-        case _: JsString => Left(value.convertTo[ActivationId])
-        case _: JsObject => Right(value.convertTo[WhiskActivation])
-        case _           => deserializationError("could not read 
ResultMessage")
-      }
-    }
+object CompletionMessage extends DefaultJsonProtocol {
+  def apply(transid: TransactionId, activation: WhiskActivation, invoker: 
InvokerInstanceId): CompletionMessage = {
+    CompletionMessage(transid, activation.activationId, 
Some(activation.response.isWhiskError), invoker)
+  }
+  implicit val serdes = jsonFormat4(
+    CompletionMessage.apply(_: TransactionId, _: ActivationId, _: 
Option[Boolean], _: InvokerInstanceId))
+}
 
-  def parse(msg: String): Try[ResultMessage] = Try(serdes.read(msg.parseJson))
-  implicit val serdes = jsonFormat2(ResultMessage.apply)
+object ResultMessage extends DefaultJsonProtocol {
+  def apply(transid: TransactionId, activation: WhiskActivation): 
ResultMessage =
+    ResultMessage(transid, Right(activation))
+  implicit private val eitherSerdes = AcknowledegmentMessage.eitherResponse
+  implicit val serdes = jsonFormat2(ResultMessage.apply(_: TransactionId, _: 
Either[ActivationId, WhiskActivation]))
 }
 
 object AcknowledegmentMessage extends DefaultJsonProtocol {
-  def parse(msg: String): Try[AcknowledegmentMessage] = {
-    Try(serdes.read(msg.parseJson))
+  def parse(msg: String): Try[AcknowledegmentMessage] = 
Try(serdes.read(msg.parseJson))
+
+  protected[connector] val eitherResponse = new 
JsonFormat[Either[ActivationId, WhiskActivation]] {
+    def write(either: Either[ActivationId, WhiskActivation]) = 
either.fold(_.toJson, _.toJson)
+
+    def read(value: JsValue) = value match {
+      case _: JsString =>
+        // per the ActivationId serializer, an activation id is a String even 
if it only consists of digits
+        Left(value.convertTo[ActivationId])
+
+      case _: JsObject => Right(value.convertTo[WhiskActivation])
+      case _           => deserializationError("could not read ResultMessage")
+    }
   }
 
   implicit val serdes = new RootJsonFormat[AcknowledegmentMessage] {
-    override def write(obj: AcknowledegmentMessage): JsValue = {
-      obj match {
-        case c: CompletionMessage => c.toJson
-        case r: ResultMessage     => r.toJson
-      }
-    }
+    override def write(m: AcknowledegmentMessage): JsValue = m.toJson
 
+    // The field invoker is only part of CombinedCompletionAndResultMessage 
and CompletionMessage.
+    // If this field is part of the JSON, we try to deserialize into one of 
these two types,
+    // and otherwise to a ResultMessage. If all conversions fail, an error 
will be thrown that needs to be handled.
     override def read(json: JsValue): AcknowledegmentMessage = {
-      json.asJsObject
-      // The field invoker is only part of the CompletionMessage. If this 
field is part of the JSON, we try to convert
-      // it to a CompletionMessage. Otherwise to a ResultMessage.
-      // If both conversions fail, an error will be thrown that needs to be 
handled.
+      val obj = json.asJsObject
+
+      obj
         .getFields("invoker")
         .headOption
-        .map(_ => json.convertTo[CompletionMessage])
+        .map(_ => {
 
 Review comment:
   From a code maintainability and data modelling perspective, it would be 
great to also serialise a `type` field that can be used later on to easily 
de-serialise the JSON. At the same time, this extension increases message size.
   
   I guess the increased size does not really matter whenever a 
`WhiskActivation` is embedded because a `type` field is pretty small compared 
to a `WhiskActivation`.
   
   I think the increased size would only matter for the `CompletionMessage`. 
Today, small JSONs for `CompletionMessage` would look like (146 Byte in compact 
form):
   
   ```
   
{"transid":"5808a97c269295220a6d8e78508f118b","activationId":"0f3763366aba46a0b763366abae6a0a4","invoker":{"instance":0,"userMemory":"16384
 MB"}}
   ```
   
   When adding a `type` field, we get (166 Byte in compact form):
   
   ```
   
{"type":"completion","transid":"5808a97c269295220a6d8e78508f118b","activationId":"0f3763366aba46a0b763366abae6a0a4","invoker":{"instance":0,"userMemory":"16384
 MB"}}
   ```
   
   This is a size increase of around 14% - which could be compensated by 
refactoring user memory handling such that it's no more serialised in the 
`CompletionMessage` - but only in pings.
   
   So I support @chetanmeh's proposal.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to