This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 8759cad Change AcknowledegmentMessage interface (#4898)
8759cad is described below
commit 8759cadeea6101a3693aac1d34d4069f516d5558
Author: 김건희 <[email protected]>
AuthorDate: Wed Jun 3 10:57:53 2020 +0900
Change AcknowledegmentMessage interface (#4898)
* Add instance type to serde in InstanceId
* Add serde test case
* Fix type mismatch
---
.../apache/openwhisk/core/connector/Message.scala | 32 ++++----
.../apache/openwhisk/core/entity/InstanceId.scala | 86 +++++++++++++++++++++-
.../core/loadBalancer/CommonLoadBalancer.scala | 25 ++++---
.../entity/test/ControllerInstanceIdTests.scala | 12 ++-
.../core/entity/test/InvokerInstanceIdTests.scala | 69 +++++++++++++++++
5 files changed, 193 insertions(+), 31 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index 0ab248a..5ca2a71 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -83,7 +83,7 @@ abstract class AcknowledegmentMessage(private val tid:
TransactionId) extends Me
def messageType: String
/** Does message indicate slot is free? */
- def isSlotFree: Option[InvokerInstanceId]
+ def isSlotFree: Option[InstanceId]
/** Does message contain a result? */
def result: Option[Either[ActivationId, WhiskActivation]]
@@ -117,11 +117,11 @@ abstract class AcknowledegmentMessage(private val tid:
TransactionId) extends Me
case class CombinedCompletionAndResultMessage private (override val transid:
TransactionId,
response:
Either[ActivationId, WhiskActivation],
override val
isSystemError: Option[Boolean],
- invoker:
InvokerInstanceId)
+ instance: InstanceId)
extends AcknowledegmentMessage(transid) {
override def messageType = "combined"
override def result = Some(response)
- override def isSlotFree = Some(invoker)
+ override def isSlotFree = Some(instance)
override def activationId = response.fold(identity, _.activationId)
override def toJson = CombinedCompletionAndResultMessage.serdes.write(this)
override def shrink = copy(response = response.flatMap(a =>
Left(a.activationId)))
@@ -137,11 +137,11 @@ case class CombinedCompletionAndResultMessage private
(override val transid: Tra
case class CompletionMessage private (override val transid: TransactionId,
override val activationId: ActivationId,
override val isSystemError:
Option[Boolean],
- invoker: InvokerInstanceId)
+ instance: InstanceId)
extends AcknowledegmentMessage(transid) {
override def messageType = "completion"
override def result = None
- override def isSlotFree = Some(invoker)
+ override def isSlotFree = Some(instance)
override def toJson = CompletionMessage.serdes.write(this)
override def shrink = this
override def toString = activationId.asString
@@ -179,18 +179,18 @@ object CombinedCompletionAndResultMessage extends
DefaultJsonProtocol {
private def apply(transid: TransactionId,
activation: Either[ActivationId, WhiskActivation],
isSystemError: Option[Boolean],
- invoker: InvokerInstanceId):
CombinedCompletionAndResultMessage =
- new CombinedCompletionAndResultMessage(transid, activation, isSystemError,
invoker)
+ instance: InstanceId): CombinedCompletionAndResultMessage =
+ new CombinedCompletionAndResultMessage(transid, activation, isSystemError,
instance)
def apply(transid: TransactionId,
activation: WhiskActivation,
- invoker: InvokerInstanceId): CombinedCompletionAndResultMessage =
- new CombinedCompletionAndResultMessage(transid, Right(activation),
Some(activation.response.isWhiskError), invoker)
+ instance: InstanceId): CombinedCompletionAndResultMessage =
+ new CombinedCompletionAndResultMessage(transid, Right(activation),
Some(activation.response.isWhiskError), instance)
implicit private val eitherSerdes = AcknowledegmentMessage.eitherResponse
implicit val serdes = jsonFormat4(
CombinedCompletionAndResultMessage
- .apply(_: TransactionId, _: Either[ActivationId, WhiskActivation], _:
Option[Boolean], _: InvokerInstanceId))
+ .apply(_: TransactionId, _: Either[ActivationId, WhiskActivation], _:
Option[Boolean], _: InstanceId))
}
object CompletionMessage extends DefaultJsonProtocol {
@@ -198,15 +198,15 @@ object CompletionMessage extends DefaultJsonProtocol {
private def apply(transid: TransactionId,
activation: WhiskActivation,
isSystemError: Option[Boolean],
- invoker: InvokerInstanceId): CompletionMessage =
- new CompletionMessage(transid, activation.activationId,
Some(activation.response.isWhiskError), invoker)
+ instance: InstanceId): CompletionMessage =
+ new CompletionMessage(transid, activation.activationId,
Some(activation.response.isWhiskError), instance)
- def apply(transid: TransactionId, activation: WhiskActivation, invoker:
InvokerInstanceId): CompletionMessage = {
- new CompletionMessage(transid, activation.activationId,
Some(activation.response.isWhiskError), invoker)
+ def apply(transid: TransactionId, activation: WhiskActivation, instance:
InstanceId): CompletionMessage = {
+ new CompletionMessage(transid, activation.activationId,
Some(activation.response.isWhiskError), instance)
}
implicit val serdes = jsonFormat4(
- CompletionMessage.apply(_: TransactionId, _: ActivationId, _:
Option[Boolean], _: InvokerInstanceId))
+ CompletionMessage.apply(_: TransactionId, _: ActivationId, _:
Option[Boolean], _: InstanceId))
}
object ResultMessage extends DefaultJsonProtocol {
@@ -245,7 +245,7 @@ object AcknowledegmentMessage extends DefaultJsonProtocol {
// and otherwise to a ResultMessage. If all conversions fail, an error
will be thrown that needs to be handled.
override def read(json: JsValue): AcknowledegmentMessage = {
val JsObject(fields) = json
- val completion = fields.contains("invoker")
+ val completion = fields.contains("instance")
val result = fields.contains("response")
if (completion && result) {
json.convertTo[CombinedCompletionAndResultMessage]
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
index aa31799..5db91c7 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
@@ -17,7 +17,11 @@
package org.apache.openwhisk.core.entity
-import spray.json.DefaultJsonProtocol
+import spray.json.{deserializationError, DefaultJsonProtocol, JsNumber,
JsObject, JsString, JsValue, RootJsonFormat}
+import spray.json._
+
+import scala.collection.mutable.ListBuffer
+import scala.util.Try
/**
* An instance id representing an invoker
@@ -38,6 +42,8 @@ case class InvokerInstanceId(val instance: Int,
override val source = s"$instanceType$instance"
override val toString: String = (Seq("invoker" + instance) ++ uniqueName ++
displayedName).mkString("/")
+
+ override val toJson: JsValue = InvokerInstanceId.serdes.write(this)
}
case class ControllerInstanceId(asString: String) extends InstanceId {
@@ -47,15 +53,61 @@ case class ControllerInstanceId(asString: String) extends
InstanceId {
override val source = s"$instanceType$asString"
override val toString: String = source
+
+ override val toJson: JsValue = ControllerInstanceId.serdes.write(this)
}
object InvokerInstanceId extends DefaultJsonProtocol {
- import org.apache.openwhisk.core.entity.size.{serdes => xserds}
- implicit val serdes = jsonFormat(InvokerInstanceId.apply, "instance",
"uniqueName", "displayedName", "userMemory")
+ def parse(c: String): Try[InvokerInstanceId] = Try(serdes.read(c.parseJson))
+
+ implicit val serdes = new RootJsonFormat[InvokerInstanceId] {
+ override def write(i: InvokerInstanceId): JsValue = {
+ val fields = new ListBuffer[(String, JsValue)]
+ fields ++= List("instance" -> JsNumber(i.instance))
+ fields ++= List("userMemory" -> JsString(i.userMemory.toString))
+ fields ++= List("instanceType" -> JsString(i.instanceType))
+ i.uniqueName.foreach(uniqueName => fields ++= List("uniqueName" ->
JsString(uniqueName)))
+ i.displayedName.foreach(displayedName => fields ++= List("displayedName"
-> JsString(displayedName)))
+ JsObject(fields.toSeq: _*)
+ }
+
+ override def read(json: JsValue): InvokerInstanceId = {
+ val instance = fromField[Int](json, "instance")
+ val uniqueName = fromField[Option[String]](json, "uniqueName")
+ val displayedName = fromField[Option[String]](json, "displayedName")
+ val userMemory = fromField[String](json, "userMemory")
+ val instanceType = fromField[String](json, "instanceType")
+
+ if (instanceType == "invoker") {
+ new InvokerInstanceId(instance, uniqueName, displayedName,
ByteSize.fromString(userMemory))
+ } else {
+ deserializationError("could not read InvokerInstanceId")
+ }
+ }
+ }
+
}
object ControllerInstanceId extends DefaultJsonProtocol {
- implicit val serdes = jsonFormat(ControllerInstanceId.apply _, "asString")
+ def parse(c: String): Try[ControllerInstanceId] =
Try(serdes.read(c.parseJson))
+
+ implicit val serdes = new RootJsonFormat[ControllerInstanceId] {
+ override def write(c: ControllerInstanceId): JsValue =
+ JsObject("asString" -> JsString(c.asString), "instanceType" ->
JsString(c.instanceType))
+
+ override def read(json: JsValue): ControllerInstanceId = {
+ json.asJsObject.getFields("asString", "instanceType") match {
+ case Seq(JsString(asString), JsString(instanceType)) =>
+ if (instanceType == "controller") {
+ new ControllerInstanceId(asString)
+ } else {
+ deserializationError("could not read ControllerInstanceId")
+ }
+ case _ =>
+ deserializationError("could not read ControllerInstanceId")
+ }
+ }
+ }
}
trait InstanceId {
@@ -67,6 +119,8 @@ trait InstanceId {
// reserve some number of characters as the prefix to be added to topic names
private val MAX_NAME_LENGTH = 249 - 121
+ def serialize: String = InstanceId.serdes.write(this).compactPrint
+
def validate(asString: String): Unit =
require(
asString.length <= MAX_NAME_LENGTH && asString.matches(LEGAL_CHARS),
@@ -76,4 +130,28 @@ trait InstanceId {
val source: String
+ val toJson: JsValue
+}
+
+object InstanceId extends DefaultJsonProtocol {
+ def parse(i: String): Try[InstanceId] = Try(serdes.read(i.parseJson))
+
+ implicit val serdes = new RootJsonFormat[InstanceId] {
+ override def write(i: InstanceId): JsValue = i.toJson
+
+ override def read(json: JsValue): InstanceId = {
+ val JsObject(field) = json
+ field
+ .get("instanceType")
+ .map(_.convertTo[String] match {
+ case "invoker" =>
+ json.convertTo[InvokerInstanceId]
+ case "controller" =>
+ json.convertTo[ControllerInstanceId]
+ case _ =>
+ deserializationError("could not read InstanceId")
+ })
+ .getOrElse(deserializationError("could not read InstanceId"))
+ }
+ }
}
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
index 455399d..3d0cc2f 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
@@ -148,7 +148,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
activationSlots.getOrElseUpdate(
msg.activationId, {
val timeoutHandler =
actorSystem.scheduler.scheduleOnce(completionAckTimeout) {
- processCompletion(msg.activationId, msg.transid, forced = true,
isSystemError = false, invoker = instance)
+ processCompletion(msg.activationId, msg.transid, forced = true,
isSystemError = false, instance = instance)
}
// please note: timeoutHandler.cancel must be called on all
non-timeout paths, e.g. Success
@@ -206,13 +206,13 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
val raw = new String(bytes, StandardCharsets.UTF_8)
AcknowledegmentMessage.parse(raw) match {
case Success(acknowledegment) =>
- acknowledegment.isSlotFree.foreach { invoker =>
+ acknowledegment.isSlotFree.foreach { instance =>
processCompletion(
acknowledegment.activationId,
acknowledegment.transid,
forced = false,
isSystemError = acknowledegment.isSystemError.getOrElse(false),
- invoker)
+ instance)
}
acknowledegment.result.foreach { response =>
@@ -261,7 +261,12 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
tid: TransactionId,
forced: Boolean,
isSystemError: Boolean,
- invoker: InvokerInstanceId):
Unit = {
+ instance: InstanceId): Unit = {
+
+ val invoker = instance match {
+ case i: InvokerInstanceId => Some(i)
+ case _ => None
+ }
val invocationResult = if (forced) {
InvocationFinishedResult.Timeout
@@ -283,7 +288,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
totalActivationMemory.add(entry.memoryLimit.toMB * (-1))
activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
- releaseInvoker(invoker, entry)
+ invoker.foreach(releaseInvoker(_, entry))
if (!forced) {
entry.timeoutHandler.cancel()
@@ -305,7 +310,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
val completionAckTimeout =
calculateCompletionAckTimeout(entry.timeLimit)
logging.warn(
this,
- s"forced completion ack for '$aid', action
'${entry.fullyQualifiedEntityName}' ($actionType), $blockingType, mem limit
${entry.memoryLimit.toMB} MB, time limit ${entry.timeLimit.toMillis} ms,
completion ack timeout $completionAckTimeout from $invoker")(
+ s"forced completion ack for '$aid', action
'${entry.fullyQualifiedEntityName}' ($actionType), $blockingType, mem limit
${entry.memoryLimit.toMB} MB, time limit ${entry.timeLimit.toMillis} ms,
completion ack timeout $completionAckTimeout from $instance")(
tid)
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED)
@@ -314,17 +319,17 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
// Completion acks that are received here are strictly from user
actions - health actions are not part of
// the load balancer's activation map. Inform the invoker pool
supervisor of the user action completion.
// guard this
- invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
+ invoker.foreach(invokerPool ! InvocationFinishedMessage(_,
invocationResult))
case None if tid == TransactionId.invokerHealth =>
// Health actions do not have an ActivationEntry as they are written
on the message bus directly. Their result
// is important to pass to the invokerPool because they are used to
determine if the invoker can be considered
// healthy again.
- logging.info(this, s"received completion ack for health action on
$invoker")(tid)
+ logging.info(this, s"received completion ack for health action on
$instance")(tid)
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_HEALTHCHECK)
// guard this
- invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
+ invoker.foreach(invokerPool ! InvocationFinishedMessage(_,
invocationResult))
case None if !forced =>
// Received a completion ack that has already been taken out of the
state because of a timeout (forced ack).
// The result is ignored because a timeout has already been reported
to the invokerPool per the force.
@@ -332,7 +337,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
// message - but not in time.
logging.warn(
this,
- s"received completion ack for '$aid' from $invoker which has no
entry, system error=$isSystemError")(tid)
+ s"received completion ack for '$aid' from $instance which has no
entry, system error=$isSystemError")(tid)
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED)
case None =>
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ControllerInstanceIdTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ControllerInstanceIdTests.scala
index bf5e60d..ba8db58 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ControllerInstanceIdTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ControllerInstanceIdTests.scala
@@ -21,7 +21,10 @@ import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
-import org.apache.openwhisk.core.entity.ControllerInstanceId
+import org.apache.openwhisk.core.entity.{ControllerInstanceId, InstanceId}
+import spray.json.{JsObject, JsString}
+
+import scala.util.Success
@RunWith(classOf[JUnitRunner])
class ControllerInstanceIdTests extends FlatSpec with Matchers {
@@ -43,4 +46,11 @@ class ControllerInstanceIdTests extends FlatSpec with
Matchers {
}
}
+ it should "serialize and deserialize ControllerInstanceId" in {
+ val i = ControllerInstanceId("controller0")
+ i.serialize shouldBe JsObject("asString" -> JsString(i.asString),
"instanceType" -> JsString(i.instanceType)).compactPrint
+ i.serialize shouldBe i.toJson.compactPrint
+ InstanceId.parse(i.serialize) shouldBe Success(i)
+ }
+
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/InvokerInstanceIdTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/InvokerInstanceIdTests.scala
new file mode 100644
index 0000000..b0a1016
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/InvokerInstanceIdTests.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.entity.test
+
+import org.apache.openwhisk.core.entity.size.SizeInt
+import org.apache.openwhisk.core.entity.{ByteSize, InstanceId,
InvokerInstanceId}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+import spray.json.{JsNumber, JsObject, JsString}
+
+import scala.util.Success
+
+@RunWith(classOf[JUnitRunner])
+class InvokerInstanceIdTests extends FlatSpec with Matchers {
+
+ behavior of "InvokerInstanceIdTests"
+
+ val defaultUserMemory: ByteSize = 1024.MB
+ it should "serialize and deserialize InvokerInstanceId" in {
+ val i = InvokerInstanceId(0, userMemory = defaultUserMemory)
+ i.serialize shouldBe JsObject(
+ "instance" -> JsNumber(i.instance),
+ "userMemory" -> JsString(i.userMemory.toString),
+ "instanceType" -> JsString(i.instanceType)).compactPrint
+ i.serialize shouldBe i.toJson.compactPrint
+ InstanceId.parse(i.serialize) shouldBe Success(i)
+ }
+
+ it should "serialize and deserialize InvokerInstanceId with optional field"
in {
+ val i1 = InvokerInstanceId(0, uniqueName = Some("uniqueInvoker"),
userMemory = defaultUserMemory)
+ i1.serialize shouldBe JsObject(
+ "instance" -> JsNumber(i1.instance),
+ "userMemory" -> JsString(i1.userMemory.toString),
+ "instanceType" -> JsString(i1.instanceType),
+ "uniqueName" -> JsString(i1.uniqueName.getOrElse(""))).compactPrint
+ i1.serialize shouldBe i1.toJson.compactPrint
+ InstanceId.parse(i1.serialize) shouldBe Success(i1)
+
+ val i2 = InvokerInstanceId(
+ 0,
+ uniqueName = Some("uniqueInvoker"),
+ displayedName = Some("displayedInvoker"),
+ userMemory = defaultUserMemory)
+ i2.serialize shouldBe JsObject(
+ "instance" -> JsNumber(i2.instance),
+ "userMemory" -> JsString(i2.userMemory.toString),
+ "instanceType" -> JsString(i2.instanceType),
+ "uniqueName" -> JsString(i2.uniqueName.getOrElse("")),
+ "displayedName" -> JsString(i2.displayedName.getOrElse(""))).compactPrint
+ i2.serialize shouldBe i2.toJson.compactPrint
+ InstanceId.parse(i2.serialize) shouldBe Success(i2)
+ }
+}