This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 8759cad  Change AcknowledegmentMessage interface (#4898)
8759cad is described below

commit 8759cadeea6101a3693aac1d34d4069f516d5558
Author: 김건희 <[email protected]>
AuthorDate: Wed Jun 3 10:57:53 2020 +0900

    Change AcknowledegmentMessage interface (#4898)
    
    * Add instance type to serde in InstanceId
    
    * Add serde test case
    
    * Fix type mismatch
---
 .../apache/openwhisk/core/connector/Message.scala  | 32 ++++----
 .../apache/openwhisk/core/entity/InstanceId.scala  | 86 +++++++++++++++++++++-
 .../core/loadBalancer/CommonLoadBalancer.scala     | 25 ++++---
 .../entity/test/ControllerInstanceIdTests.scala    | 12 ++-
 .../core/entity/test/InvokerInstanceIdTests.scala  | 69 +++++++++++++++++
 5 files changed, 193 insertions(+), 31 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index 0ab248a..5ca2a71 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -83,7 +83,7 @@ abstract class AcknowledegmentMessage(private val tid: 
TransactionId) extends Me
   def messageType: String
 
   /** Does message indicate slot is free? */
-  def isSlotFree: Option[InvokerInstanceId]
+  def isSlotFree: Option[InstanceId]
 
   /** Does message contain a result? */
   def result: Option[Either[ActivationId, WhiskActivation]]
@@ -117,11 +117,11 @@ abstract class AcknowledegmentMessage(private val tid: 
TransactionId) extends Me
 case class CombinedCompletionAndResultMessage private (override val transid: 
TransactionId,
                                                        response: 
Either[ActivationId, WhiskActivation],
                                                        override val 
isSystemError: Option[Boolean],
-                                                       invoker: 
InvokerInstanceId)
+                                                       instance: InstanceId)
     extends AcknowledegmentMessage(transid) {
   override def messageType = "combined"
   override def result = Some(response)
-  override def isSlotFree = Some(invoker)
+  override def isSlotFree = Some(instance)
   override def activationId = response.fold(identity, _.activationId)
   override def toJson = CombinedCompletionAndResultMessage.serdes.write(this)
   override def shrink = copy(response = response.flatMap(a => 
Left(a.activationId)))
@@ -137,11 +137,11 @@ case class CombinedCompletionAndResultMessage private 
(override val transid: Tra
 case class CompletionMessage private (override val transid: TransactionId,
                                       override val activationId: ActivationId,
                                       override val isSystemError: 
Option[Boolean],
-                                      invoker: InvokerInstanceId)
+                                      instance: InstanceId)
     extends AcknowledegmentMessage(transid) {
   override def messageType = "completion"
   override def result = None
-  override def isSlotFree = Some(invoker)
+  override def isSlotFree = Some(instance)
   override def toJson = CompletionMessage.serdes.write(this)
   override def shrink = this
   override def toString = activationId.asString
@@ -179,18 +179,18 @@ object CombinedCompletionAndResultMessage extends 
DefaultJsonProtocol {
   private def apply(transid: TransactionId,
                     activation: Either[ActivationId, WhiskActivation],
                     isSystemError: Option[Boolean],
-                    invoker: InvokerInstanceId): 
CombinedCompletionAndResultMessage =
-    new CombinedCompletionAndResultMessage(transid, activation, isSystemError, 
invoker)
+                    instance: InstanceId): CombinedCompletionAndResultMessage =
+    new CombinedCompletionAndResultMessage(transid, activation, isSystemError, 
instance)
 
   def apply(transid: TransactionId,
             activation: WhiskActivation,
-            invoker: InvokerInstanceId): CombinedCompletionAndResultMessage =
-    new CombinedCompletionAndResultMessage(transid, Right(activation), 
Some(activation.response.isWhiskError), invoker)
+            instance: InstanceId): CombinedCompletionAndResultMessage =
+    new CombinedCompletionAndResultMessage(transid, Right(activation), 
Some(activation.response.isWhiskError), instance)
 
   implicit private val eitherSerdes = AcknowledegmentMessage.eitherResponse
   implicit val serdes = jsonFormat4(
     CombinedCompletionAndResultMessage
-      .apply(_: TransactionId, _: Either[ActivationId, WhiskActivation], _: 
Option[Boolean], _: InvokerInstanceId))
+      .apply(_: TransactionId, _: Either[ActivationId, WhiskActivation], _: 
Option[Boolean], _: InstanceId))
 }
 
 object CompletionMessage extends DefaultJsonProtocol {
@@ -198,15 +198,15 @@ object CompletionMessage extends DefaultJsonProtocol {
   private def apply(transid: TransactionId,
                     activation: WhiskActivation,
                     isSystemError: Option[Boolean],
-                    invoker: InvokerInstanceId): CompletionMessage =
-    new CompletionMessage(transid, activation.activationId, 
Some(activation.response.isWhiskError), invoker)
+                    instance: InstanceId): CompletionMessage =
+    new CompletionMessage(transid, activation.activationId, 
Some(activation.response.isWhiskError), instance)
 
-  def apply(transid: TransactionId, activation: WhiskActivation, invoker: 
InvokerInstanceId): CompletionMessage = {
-    new CompletionMessage(transid, activation.activationId, 
Some(activation.response.isWhiskError), invoker)
+  def apply(transid: TransactionId, activation: WhiskActivation, instance: 
InstanceId): CompletionMessage = {
+    new CompletionMessage(transid, activation.activationId, 
Some(activation.response.isWhiskError), instance)
   }
 
   implicit val serdes = jsonFormat4(
-    CompletionMessage.apply(_: TransactionId, _: ActivationId, _: 
Option[Boolean], _: InvokerInstanceId))
+    CompletionMessage.apply(_: TransactionId, _: ActivationId, _: 
Option[Boolean], _: InstanceId))
 }
 
 object ResultMessage extends DefaultJsonProtocol {
@@ -245,7 +245,7 @@ object AcknowledegmentMessage extends DefaultJsonProtocol {
     // and otherwise to a ResultMessage. If all conversions fail, an error 
will be thrown that needs to be handled.
     override def read(json: JsValue): AcknowledegmentMessage = {
       val JsObject(fields) = json
-      val completion = fields.contains("invoker")
+      val completion = fields.contains("instance")
       val result = fields.contains("response")
       if (completion && result) {
         json.convertTo[CombinedCompletionAndResultMessage]
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
index aa31799..5db91c7 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
@@ -17,7 +17,11 @@
 
 package org.apache.openwhisk.core.entity
 
-import spray.json.DefaultJsonProtocol
+import spray.json.{deserializationError, DefaultJsonProtocol, JsNumber, 
JsObject, JsString, JsValue, RootJsonFormat}
+import spray.json._
+
+import scala.collection.mutable.ListBuffer
+import scala.util.Try
 
 /**
  * An instance id representing an invoker
@@ -38,6 +42,8 @@ case class InvokerInstanceId(val instance: Int,
   override val source = s"$instanceType$instance"
 
   override val toString: String = (Seq("invoker" + instance) ++ uniqueName ++ 
displayedName).mkString("/")
+
+  override val toJson: JsValue = InvokerInstanceId.serdes.write(this)
 }
 
 case class ControllerInstanceId(asString: String) extends InstanceId {
@@ -47,15 +53,61 @@ case class ControllerInstanceId(asString: String) extends 
InstanceId {
   override val source = s"$instanceType$asString"
 
   override val toString: String = source
+
+  override val toJson: JsValue = ControllerInstanceId.serdes.write(this)
 }
 
 object InvokerInstanceId extends DefaultJsonProtocol {
-  import org.apache.openwhisk.core.entity.size.{serdes => xserds}
-  implicit val serdes = jsonFormat(InvokerInstanceId.apply, "instance", 
"uniqueName", "displayedName", "userMemory")
+  def parse(c: String): Try[InvokerInstanceId] = Try(serdes.read(c.parseJson))
+
+  implicit val serdes = new RootJsonFormat[InvokerInstanceId] {
+    override def write(i: InvokerInstanceId): JsValue = {
+      val fields = new ListBuffer[(String, JsValue)]
+      fields ++= List("instance" -> JsNumber(i.instance))
+      fields ++= List("userMemory" -> JsString(i.userMemory.toString))
+      fields ++= List("instanceType" -> JsString(i.instanceType))
+      i.uniqueName.foreach(uniqueName => fields ++= List("uniqueName" -> 
JsString(uniqueName)))
+      i.displayedName.foreach(displayedName => fields ++= List("displayedName" 
-> JsString(displayedName)))
+      JsObject(fields.toSeq: _*)
+    }
+
+    override def read(json: JsValue): InvokerInstanceId = {
+      val instance = fromField[Int](json, "instance")
+      val uniqueName = fromField[Option[String]](json, "uniqueName")
+      val displayedName = fromField[Option[String]](json, "displayedName")
+      val userMemory = fromField[String](json, "userMemory")
+      val instanceType = fromField[String](json, "instanceType")
+
+      if (instanceType == "invoker") {
+        new InvokerInstanceId(instance, uniqueName, displayedName, 
ByteSize.fromString(userMemory))
+      } else {
+        deserializationError("could not read InvokerInstanceId")
+      }
+    }
+  }
+
 }
 
 object ControllerInstanceId extends DefaultJsonProtocol {
-  implicit val serdes = jsonFormat(ControllerInstanceId.apply _, "asString")
+  def parse(c: String): Try[ControllerInstanceId] = 
Try(serdes.read(c.parseJson))
+
+  implicit val serdes = new RootJsonFormat[ControllerInstanceId] {
+    override def write(c: ControllerInstanceId): JsValue =
+      JsObject("asString" -> JsString(c.asString), "instanceType" -> 
JsString(c.instanceType))
+
+    override def read(json: JsValue): ControllerInstanceId = {
+      json.asJsObject.getFields("asString", "instanceType") match {
+        case Seq(JsString(asString), JsString(instanceType)) =>
+          if (instanceType == "controller") {
+            new ControllerInstanceId(asString)
+          } else {
+            deserializationError("could not read ControllerInstanceId")
+          }
+        case _ =>
+          deserializationError("could not read ControllerInstanceId")
+      }
+    }
+  }
 }
 
 trait InstanceId {
@@ -67,6 +119,8 @@ trait InstanceId {
   // reserve some number of characters as the prefix to be added to topic names
   private val MAX_NAME_LENGTH = 249 - 121
 
+  def serialize: String = InstanceId.serdes.write(this).compactPrint
+
   def validate(asString: String): Unit =
     require(
       asString.length <= MAX_NAME_LENGTH && asString.matches(LEGAL_CHARS),
@@ -76,4 +130,28 @@ trait InstanceId {
 
   val source: String
 
+  val toJson: JsValue
+}
+
+object InstanceId extends DefaultJsonProtocol {
+  def parse(i: String): Try[InstanceId] = Try(serdes.read(i.parseJson))
+
+  implicit val serdes = new RootJsonFormat[InstanceId] {
+    override def write(i: InstanceId): JsValue = i.toJson
+
+    override def read(json: JsValue): InstanceId = {
+      val JsObject(field) = json
+      field
+        .get("instanceType")
+        .map(_.convertTo[String] match {
+          case "invoker" =>
+            json.convertTo[InvokerInstanceId]
+          case "controller" =>
+            json.convertTo[ControllerInstanceId]
+          case _ =>
+            deserializationError("could not read InstanceId")
+        })
+        .getOrElse(deserializationError("could not read InstanceId"))
+    }
+  }
 }
diff --git 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
index 455399d..3d0cc2f 100644
--- 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
+++ 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
@@ -148,7 +148,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
     activationSlots.getOrElseUpdate(
       msg.activationId, {
         val timeoutHandler = 
actorSystem.scheduler.scheduleOnce(completionAckTimeout) {
-          processCompletion(msg.activationId, msg.transid, forced = true, 
isSystemError = false, invoker = instance)
+          processCompletion(msg.activationId, msg.transid, forced = true, 
isSystemError = false, instance = instance)
         }
 
         // please note: timeoutHandler.cancel must be called on all 
non-timeout paths, e.g. Success
@@ -206,13 +206,13 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
     val raw = new String(bytes, StandardCharsets.UTF_8)
     AcknowledegmentMessage.parse(raw) match {
       case Success(acknowledegment) =>
-        acknowledegment.isSlotFree.foreach { invoker =>
+        acknowledegment.isSlotFree.foreach { instance =>
           processCompletion(
             acknowledegment.activationId,
             acknowledegment.transid,
             forced = false,
             isSystemError = acknowledegment.isSystemError.getOrElse(false),
-            invoker)
+            instance)
         }
 
         acknowledegment.result.foreach { response =>
@@ -261,7 +261,12 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
                                                 tid: TransactionId,
                                                 forced: Boolean,
                                                 isSystemError: Boolean,
-                                                invoker: InvokerInstanceId): 
Unit = {
+                                                instance: InstanceId): Unit = {
+
+    val invoker = instance match {
+      case i: InvokerInstanceId => Some(i)
+      case _                    => None
+    }
 
     val invocationResult = if (forced) {
       InvocationFinishedResult.Timeout
@@ -283,7 +288,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
         totalActivationMemory.add(entry.memoryLimit.toMB * (-1))
         activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
 
-        releaseInvoker(invoker, entry)
+        invoker.foreach(releaseInvoker(_, entry))
 
         if (!forced) {
           entry.timeoutHandler.cancel()
@@ -305,7 +310,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
           val completionAckTimeout = 
calculateCompletionAckTimeout(entry.timeLimit)
           logging.warn(
             this,
-            s"forced completion ack for '$aid', action 
'${entry.fullyQualifiedEntityName}' ($actionType), $blockingType, mem limit 
${entry.memoryLimit.toMB} MB, time limit ${entry.timeLimit.toMillis} ms, 
completion ack timeout $completionAckTimeout from $invoker")(
+            s"forced completion ack for '$aid', action 
'${entry.fullyQualifiedEntityName}' ($actionType), $blockingType, mem limit 
${entry.memoryLimit.toMB} MB, time limit ${entry.timeLimit.toMillis} ms, 
completion ack timeout $completionAckTimeout from $instance")(
             tid)
 
           MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED)
@@ -314,17 +319,17 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
         // Completion acks that are received here are strictly from user 
actions - health actions are not part of
         // the load balancer's activation map. Inform the invoker pool 
supervisor of the user action completion.
         // guard this
-        invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
+        invoker.foreach(invokerPool ! InvocationFinishedMessage(_, 
invocationResult))
       case None if tid == TransactionId.invokerHealth =>
         // Health actions do not have an ActivationEntry as they are written 
on the message bus directly. Their result
         // is important to pass to the invokerPool because they are used to 
determine if the invoker can be considered
         // healthy again.
-        logging.info(this, s"received completion ack for health action on 
$invoker")(tid)
+        logging.info(this, s"received completion ack for health action on 
$instance")(tid)
 
         
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_HEALTHCHECK)
 
         // guard this
-        invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
+        invoker.foreach(invokerPool ! InvocationFinishedMessage(_, 
invocationResult))
       case None if !forced =>
         // Received a completion ack that has already been taken out of the 
state because of a timeout (forced ack).
         // The result is ignored because a timeout has already been reported 
to the invokerPool per the force.
@@ -332,7 +337,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
         // message - but not in time.
         logging.warn(
           this,
-          s"received completion ack for '$aid' from $invoker which has no 
entry, system error=$isSystemError")(tid)
+          s"received completion ack for '$aid' from $instance which has no 
entry, system error=$isSystemError")(tid)
 
         
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED)
       case None =>
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ControllerInstanceIdTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ControllerInstanceIdTests.scala
index bf5e60d..ba8db58 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ControllerInstanceIdTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ControllerInstanceIdTests.scala
@@ -21,7 +21,10 @@ import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.junit.JUnitRunner
-import org.apache.openwhisk.core.entity.ControllerInstanceId
+import org.apache.openwhisk.core.entity.{ControllerInstanceId, InstanceId}
+import spray.json.{JsObject, JsString}
+
+import scala.util.Success
 
 @RunWith(classOf[JUnitRunner])
 class ControllerInstanceIdTests extends FlatSpec with Matchers {
@@ -43,4 +46,11 @@ class ControllerInstanceIdTests extends FlatSpec with 
Matchers {
     }
   }
 
+  it should "serialize and deserialize ControllerInstanceId" in {
+    val i = ControllerInstanceId("controller0")
+    i.serialize shouldBe JsObject("asString" -> JsString(i.asString), 
"instanceType" -> JsString(i.instanceType)).compactPrint
+    i.serialize shouldBe i.toJson.compactPrint
+    InstanceId.parse(i.serialize) shouldBe Success(i)
+  }
+
 }
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/InvokerInstanceIdTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/InvokerInstanceIdTests.scala
new file mode 100644
index 0000000..b0a1016
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/InvokerInstanceIdTests.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.entity.test
+
+import org.apache.openwhisk.core.entity.size.SizeInt
+import org.apache.openwhisk.core.entity.{ByteSize, InstanceId, 
InvokerInstanceId}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+import spray.json.{JsNumber, JsObject, JsString}
+
+import scala.util.Success
+
+@RunWith(classOf[JUnitRunner])
+class InvokerInstanceIdTests extends FlatSpec with Matchers {
+
+  behavior of "InvokerInstanceIdTests"
+
+  val defaultUserMemory: ByteSize = 1024.MB
+  it should "serialize and deserialize InvokerInstanceId" in {
+    val i = InvokerInstanceId(0, userMemory = defaultUserMemory)
+    i.serialize shouldBe JsObject(
+      "instance" -> JsNumber(i.instance),
+      "userMemory" -> JsString(i.userMemory.toString),
+      "instanceType" -> JsString(i.instanceType)).compactPrint
+    i.serialize shouldBe i.toJson.compactPrint
+    InstanceId.parse(i.serialize) shouldBe Success(i)
+  }
+
+  it should "serialize and deserialize InvokerInstanceId with optional field" 
in {
+    val i1 = InvokerInstanceId(0, uniqueName = Some("uniqueInvoker"), 
userMemory = defaultUserMemory)
+    i1.serialize shouldBe JsObject(
+      "instance" -> JsNumber(i1.instance),
+      "userMemory" -> JsString(i1.userMemory.toString),
+      "instanceType" -> JsString(i1.instanceType),
+      "uniqueName" -> JsString(i1.uniqueName.getOrElse(""))).compactPrint
+    i1.serialize shouldBe i1.toJson.compactPrint
+    InstanceId.parse(i1.serialize) shouldBe Success(i1)
+
+    val i2 = InvokerInstanceId(
+      0,
+      uniqueName = Some("uniqueInvoker"),
+      displayedName = Some("displayedInvoker"),
+      userMemory = defaultUserMemory)
+    i2.serialize shouldBe JsObject(
+      "instance" -> JsNumber(i2.instance),
+      "userMemory" -> JsString(i2.userMemory.toString),
+      "instanceType" -> JsString(i2.instanceType),
+      "uniqueName" -> JsString(i2.uniqueName.getOrElse("")),
+      "displayedName" -> JsString(i2.displayedName.getOrElse(""))).compactPrint
+    i2.serialize shouldBe i2.toJson.compactPrint
+    InstanceId.parse(i2.serialize) shouldBe Success(i2)
+  }
+}

Reply via email to